Canceling socket operations using I/O multiplexing.

I spent the last couple of months diving into networking code at work. This led to some interesting discoveries, including how to allow an in-progress network operation to be canceled on demand. This can be an attempt to establish a connection, or a socket read or write. The standard use case is to allow the user to cancel an operation or to allow a clean shutdown when multiple threads are operating on sockets.

If we were to use blocking sockets (the default) then a call like:

while (expectingMoreData) {
  n = read(sock, buf, BUF_SIZE)
  // handle errors.
  
  // process data.
  if (messageReceived) {
    expectingMoreData = false;
  }
}

can block for a long time on the read for various reasons:

  1. The network could be slow.
  2. The network connection could have failed.
  3. The application on the other end isn’t responding.

and so on. Since it is a system call, there is no way to interrupt this read short of sending a signal to this process, and since signals are not thread-local, this quickly devolves into a mess. I have no idea how to do signal based cancellation sanely.

Just running this on a different thread is not a panacea. It is useful when you want to keep the UI responsive, or do several different network operations, but it does not help with stopping them before they are ready.

Fortunately there is a way out this if you are willing to restructure your program a bit. There are I/O multiplexing functions across UNIX implementations, and libraries like libev and libuv abstract these for cross platform code. The approach I describe can work with any of them, but for simplicity, I’m going to focus on select().

select() is a system call that has been around in UNIX for a long time. It is used to implement level-based readiness checks on non-blocking file descriptors. Non-blocking I/O only reliably works on network sockets, so in the rest of this post, file descriptors are assumed to be obtained from the socket() system call.

For an introduction to non-blocking sockets, see Beej’s Guide to Network Programming.

Diversion: A test server

Before we dig into cancellable sockets, we will use this simple server program to allow us to test our clients. The server sleeps for 5 seconds, then writes a message, and keeps doing this as long as the client is connected.

import socket
import SocketServer
import sys
import time

class WaitAndWrite(SocketServer.BaseRequestHandler):
    def handle(self):
        try:
            while True:
                time.sleep(5)
                self.request.sendall("hello world")
        except socket.error:
            print "client disconnected"

def run(port):
    server = SocketServer.TCPServer(("localhost", port), WaitAndWrite)
    server.serve_forever()

if __name__ == "__main__":
    run(int(sys.argv[1]))

A select() based read that can’t be cancelled

Here is a select based implementation of the client that uses a shared flag to decide when to stop the thread.

import select
import socket
import sys
import threading


stop_loop = False

def read_thread(port):
    # A lot of error handling elided.
    sock = socket.socket()
    # We connect in blocking just for simplicity.
    sock.connect(("localhost", port))
    sock.setblocking(False)

    try:
        while not stop_loop:
            ready_set, _, _ = select.select([sock], [], [])
            if not ready_set:
                return
            else:
                data = sock.recv(1024)
                print "Received data:", data
    except Exception as e:
        print e
    finally:
        sock.setblocking(True)
        sock.close()

def run(port):
    """Tries to read from a server connected to port."""
    global stop_loop
    t = threading.Thread(target=read_thread, args=(port,))
    print "Program is running. Press any key to quit."
    t.start()
    raw_input()
    stop_loop = True
    t.join()


if __name__ == "__main__":
    run(int(sys.argv[1]))

We spawn a thread that reads on the socket. Unfortunately this thread is now blocked until the read proceeds, so the earliest the user’s cancellation request is carried out is the next time the server responds. We are stuck at the whims of the server here. If we decided to pass timeouts to the select(), what is a good timeout value? Too long and the user feels the application is unresponsive, too short and we poll a lot, even though the user hardly ever requests an early exit.

Once we call into the kernel, we can’t resume execution until the kernel is willing to give back control. The only way to do this is to trigger an event that the multiplexing routine is waiting on. When using select(), what we would like is a file descriptor whose readiness we can control. We can then force the file descriptor to become ready on demand.

Multiplexing with cancellation

What kind of file descriptor do we need? Any stream-like API where an action on one end causes a readiness change on the other side works perfectly. UNIX provides several of these – pipes, domain sockets and network sockets. We will use domain sockets here. The source code also has a pipe example.

POSIX provides socketpair(2) which returns a set of domain sockets connected to each other. The two sockets are indistinguishable from each other. We can pass one to select(), always in the read set. When cancellation is required we can close() the other one, causing the first socket to be “ready for reading”; with no data.

import select
import socket
import sys
import threading


def read_thread(port, notify):
    # A lot of error handling elided.
    sock = socket.socket()
    # We connect in blocking just for simplicity.
    sock.connect(("localhost", port))
    sock.setblocking(False)

    try:
        while True:
            ready_set, _, _ = select.select([sock, notify], [], [])
            if not ready_set:
                return
            elif ready_set[0] == notify:
                print "User requested cancellation"
                return
            else:
                data = sock.recv(1024)
                print "Received data:", data
    except Exception as e:
        print e
    finally:
        sock.setblocking(True)
        sock.close()


def run(port):
    """Tries to read from a server connected to port."""
    cancel, notify = socket.socketpair()
    t = threading.Thread(target=read_thread, args=(port, notify))
    print "Program is running. Press any key to quit."
    t.start()
    raw_input()
    cancel.close()
    t.join()


if __name__ == "__main__":
    run(int(sys.argv[1]))

As soon as we close one end, the other end is ready for reading (with EOF). This implementation leads to a one-use set of cancellation sockets. If you require multiple cancellations but don’t want to create a new socket pair every time, this same thing can be implemented by writing bytes to the cancel end instead. Remember to read them all out in the thread to make sure the socket doesn’t stay ready!

So what about Windows?

There is a way to do this kind of cancellation on Windows using events. The general idea is to create an event that acts as a cancellation event. Associate another event with the socket, then wait for multiple objects to be signaled. Cancellation is indicated using SetEvent to mark the cancellation event as ready.

libuv provides a cross-platform abstraction for this kind of pattern using uv_async_t handles. If you dig into the implementation, it does use a pipe in the generic UNIX case!

While I was writing this post, I learned about eventfd(2) which is a Linux-only API for this kind of signaling that is cheaper than pipes or domain sockets. I’ll let you dig into it yourself.