A Future is a Suspending Scheduler

Posted on Jun 20, 2020

Introduction

This is another blog post related to the Build Systems à la Carte paper. See Using type-classes to model the expressivity of build systems for the first one.

The paper proposes splitting build systems into two components:

  1. Rebuilders decide when to rebuild a particular key (file).
  2. Schedulers decide how to rebuild multiple keys - handling dependencies while maintaining correctness and efficiency.

Schedulers come in 3 flavors (see Section 4):

  1. Topological use a simple topological sort to determine the order of building keys. They are limited to static dependencies - if new dependencies are discovered during the build process, they cannot be correctly added to the build graph.
  2. Restarting schedulers start building a particular key, then, as dependencies are discovered, abort building that key. Then the discovered dependency(ies) are built, after which the original key’s task is restarted. This allows dynamic dependencies, but at the cost of repeating parts of the build process.
  3. Suspending schedulers also start building a particular key. When a dependency is discovered, instead of aborting the current task, they suspend it, go and build the dependency, then resume the task. This allows dynamic dependencies without repeating work.

Section 4.3 says:

This scheduler (when combined with a suitable rebuilder) provides a minimal build system that supports dynamic dependencies. In our model, a suspending scheduler is easy to write – it makes a function call to compute each dependency. However, a more practical implementation is likely to build multiple dependencies in parallel, which then requires a more explicit task suspension and resumption. To implement suspension, there are two standard approaches:

  • Blocking threads or processes: this approach is relatively easy, but can require significant resources, especially if a large number of tasks are suspended. In languages with cheap green threads (e.g. Haskell), the approach is more feasible and it was the original approach taken by SHAKE.

  • Continuation-passing style (Claessen, 1999) can allow the remainder of a task to be captured, paused, and resumed later. Continuation passing is efficient, but requires the build script to be architected to allow capturing continuations. SHAKE currently uses this approach.

While a suspending scheduler is theoretically optimal, in practice, it is better than a restarting scheduler only if the cost of avoided duplicate work outweighs the cost of suspending tasks. Note, furthermore, that the cost of duplicate work may often be just a fraction of the overall build cost.

I want to show that an implementation of such a scheduler is “trivially” possible using Futures (Promises for JS folks) and provides the nice property of automatically giving parallelism, without any need for explicit suspension and resumption. We do not need to perform any transformations to code. Instead almost everything is written in a very imperative, blocking style, with async/await handling the suspension mechanics. Using futures means that we do not need a thread/process per active task. Since builds are generally I/O bound and more importantly, running separate processes than the build process, this means we can even get away with just one actual OS thread running all currently active futures.

With a “sophisticated” (I don’t know a better word) futures implementation such as in Rust, you can even make reasonable claims about the memory usage and allocation patterns of such an implementation, which can help you answer whether “the cost of avoided duplicate work outweighs the cost of suspending tasks”.

A suspending scheduler

I am going to focus on a “build system” that has these requirements:

  1. It is going to launch arbitrary external processes. This generally makes the cost of suspending pretty small compared to the cost of restarting.

  2. We want parallelism because this is 2020. At the same time, we want to bound this parallelism to avoid overloading a machine.

Just like in a real build system, our tasks can:

  1. Depend on N tasks, and cannot begin until all N of those tasks are done. The code does assume all tasks always succeed, which is not true in reality.
  2. Share common dependencies, so that multiple tasks should be able to block on a single task.

The examples below use tokio and futures to implement the runtime and various useful features. They will only run on UNIX since they use a shell.

Simple suspension

Full code in the Rust playground

Let’s start small. This one just demonstrates setting up dependencies so that a task waits until they are all done.

First we have a wall of imports, most of which we will only use later.

#![allow(unused_imports)]

use futures::future::join_all;
use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use std::rc::Rc;
use tokio::process::Command;
use tokio::runtime::Builder;
use tokio::sync::Semaphore;

Then, a couple of utility functions to create a command and to actually start the process and obtain a future to monitor when the process is done.

fn command<S: Into<String>>(cmdstr: S) -> Command {
    let mut cmd = Command::new("/bin/sh");
    cmd.arg("-c").arg(cmdstr.into());
    cmd
}

async fn run_command<S: Into<String>>(
    cmdstr: S,
) -> Result<std::process::ExitStatus, std::io::Error> {
    let str: String = cmdstr.into();
    let mut c = command(&str);
    println!("Starting '{}'", &str);
    let result = c.status().await;
    println!("Done '{}'", &str);
    result
}

// This macro exists because we are going to wrap command in a later version, and we don't want to rewrite all the call-sites.
macro_rules! command {
    ($cmd:expr) => {
        run_command($cmd)
    };
}

An async function in Rust does not actually start running as soon as it is called. Instead, it is desugared to return a Future that has “yield points” introduced at await calls. This is an excellent explanation of how Rust async/await works. The process will only be launched when status() executes.

A final bit of setup is to create a runtime in which to run the future. This is a Rust-ism and is not really present in other languages, where it is handled transparently by the runtime. As I said before, since we are launching processes which will do the actual work, we aren’t going to use a thread-pool 1. So we select the tokio basic runtime. Since our demo program doesn’t do anything else, we can just block on the one async block, and exit as soon as that is done. This section won’t get into much of the Rust-specific code. See the implementation details.

fn main() {
    let mut runtime = Builder::new()
        .basic_scheduler()
        .enable_all()
        .build()
        .unwrap();
    let local_set = tokio::task::LocalSet::new();

    local_set.block_on(&mut runtime, async {
    	// actual code goes here.
    });
}

We are ready to demo a pretend build system.

// Goes in the "actual code goes here".
let compile1 = command!("sleep 1 && echo foo.o");
let compile2 = command!("sleep 2 && echo bar.o");
let link = join_all(vec![compile1, compile2]).then(|_| command!("sleep 1 && echo baz.so"));

let mut targets = FuturesUnordered::new();
targets.push(link);

while let Some(_) = targets.next().await {}

We pretend to compile 2 object files, which can happen in parallel. Then we build a shared library that requires both these files. FuturesUnordered allows waiting on a set of futures, and treating them as a stream of results. Each call to next().await will block until one of the futures in the set is done, and then will return the value of that future. This allows us to simulate a build system that has multiple “top-level” keys (files) being built.

Running this will produce something like 2:

Starting 'sleep 1 && echo foo.o'
Starting 'sleep 2 && echo bar.o'
foo.o
Done 'sleep 1 && echo foo.o'
bar.o
Done 'sleep 2 && echo bar.o'
Starting 'sleep 1 && echo baz.so'
baz.so
Done 'sleep 1 && echo baz.so'

demonstrating that the 2 compiles start roughly at the same time (not blocking on each other) and the link does not start until they are both done.

Why is this a suspending scheduler? Well, the “build system” is requested to run the task baz.so. This task is represented by a Future returned from join_all(...).then(). Internally this generates a state machine that is going to wait on all the futures in the join. This is analogous to discovering that certain dependencies need to be built. join_all then suspends itself. It signals to the runtime that it cannot continue until these other tasks are done, so it shouldn’t be woken up until they are. The dependencies are now “present in the system” 3, and they get run to completion. Once they are done, the task resumes from the yield point and finishes itself!

A corollary of this is that the runtime will try to run as many futures (tasks) as it can. Try it! Time to make sure we don’t overload machines.

Constraining parallelism

Full code in the Rust playground

How would we impose a limit on how many tasks are running at the same time? Instead of just suspending until a task’s dependencies are ready, we could have the task stay suspended until we had the capacity to run it. We could imagine modeling this as a device that maintains some kind of shared counter. The counter has a maximum value. Each Future obtained from such a device would only be ready when there was capacity in the system. It would decrement the counter and return some handle which the task can use to indicate when it has finished running. That will increment the counter.

This sounds very much like a semaphore from thread-based parallelism. Indeed, tokio does provide a Semaphore that behaves exactly like this. It also uses some Rust idioms to make it impossible to mis-use.

First, let’s introduce a few new tasks so we can actually observe the semaphore in action.

let iter = (0..3).map(|i| command!(format!("sleep 1 && echo doc{}.html", i)));
let docs = join_all(iter)
    .then(|_| command!("echo toc.html"))
    .boxed_local();
...
targets.push(docs);

We have now started shipping documentation along with our library and we want to build it as a collection of HTML pages with a Table of Contents (TOC). The TOC naturally depends on all the pages.

Then we can introduce a semaphore (this is the tokio Semaphore and not std::sync::Semaphore):

let sem = Semaphore::new(2);

This introduces a max parallelism of 2. Now, we need to tweak the async fn run_command() to await on the semaphore before starting command execution.

async fn run_command<S: Into<String>>(
    sem: &Semaphore,
    cmdstr: S,
) -> Result<std::process::ExitStatus, std::io::Error> {
    let _p = sem.acquire().await;
    let str: String = cmdstr.into();
    let mut c = command(&str);
    println!("Starting '{}'", &str);
    let result = c.status().await;
    println!("Done '{}'", &str);
    result
}

acquire() returns a Future that will resolve when capacity is available. It will return a SemaphorePermit, which will increment the capacity again when it is dropped at the end of the scope 4.

We also tweak the command! macro to pass the semaphore to run_command()5, as well as doing some boxing of the link future. See the full example.

That’s it! Try running the code now. You will see that the number of processes started at any time is constrained to 2 and new ones start automatically as old ones finish. If you reduce the max parallelism to 1, you will see serial execution. If you make it some high value, the dependency tree will constrain how much parallelism can happen.

# Parallelism: 2
Starting 'sleep 1 && echo foo.o'
Starting 'sleep 2 && echo bar.o'
foo.o
Done 'sleep 1 && echo foo.o'
Starting 'sleep 1 && echo doc0.html'
bar.o
Done 'sleep 2 && echo bar.o'
Starting 'sleep 1 && echo doc1.html'
doc0.html
Done 'sleep 1 && echo doc0.html'
Starting 'sleep 1 && echo doc2.html'
doc1.html
Done 'sleep 1 && echo doc1.html'
Starting 'sleep 1 && echo baz.so'
doc2.html
Done 'sleep 1 && echo doc2.html'
Starting 'echo toc.html'
toc.html
Done 'echo toc.html'
baz.so
Done 'sleep 1 && echo baz.so'
# Parallelism: 9. All tasks without dependencies are immediately started.
Starting 'sleep 1 && echo foo.o'
Starting 'sleep 2 && echo bar.o'
Starting 'sleep 1 && echo doc0.html'
Starting 'sleep 1 && echo doc1.html'
Starting 'sleep 1 && echo doc2.html'
foo.o
Done 'sleep 1 && echo foo.o'
doc0.html
Done 'sleep 1 && echo doc0.html'
doc1.html
Done 'sleep 1 && echo doc1.html'
doc2.html
Done 'sleep 1 && echo doc2.html'
Starting 'echo toc.html'
toc.html
Done 'echo toc.html'
bar.o
Done 'sleep 2 && echo bar.o'
Starting 'sleep 1 && echo baz.so'
baz.so
Done 'sleep 1 && echo baz.so'

Sharing dependencies

Full code in the Rust playground

Our final requirement is allowing multiple tasks to depend on the same task. In most Future/Promise systems with managed runtimes or some kind of garbage collector (GC), this is really easy. Just add the shared Future as a dependency of each of the dependent Futures. In Rust, we will have to jump through a few hoops. To stretch our example, we now want to ship a couple of examples of using our library. They will both have the library as a build dependency.

let example1 = link.clone().then(|_| command!("echo example-app")).boxed_local();
let example2 = link.clone().then(|_| command!("echo another-example-app")).boxed_local();
...
targets.push(link.boxed_local());  // Needs to change a bit.
...
targets.push(example1);
targets.push(example2);

As you can see, we need to clone the link Future. The link Future itself changes, requiring a shared() conversion.

let link = join_all(vec![compile1, compile2])
            .then(|_| command!("sleep 1 && echo baz.so"))
            .shared();

The final bit we need to do is make the Result from run_command clone-able. The ExitStatus already supports this. We only need to wrap the std::io::Error in a Rc pointer (reference counted ownership). I also removed the Done print, since the output was getting really verbose and each command already echos when it is done.

async fn run_command<S: Into<String>>(
    sem: &Semaphore,
    cmdstr: S,
) -> Result<std::process::ExitStatus, Rc<std::io::Error>> {
    let _p = sem.acquire().await;
    let str: String = cmdstr.into();
    let mut c = command(&str);
    println!("Starting '{}'", &str);
    c.status().await.map_err(|e| Rc::new(e))  // This bit.
}

This will run exactly as we expect.

Starting 'sleep 1 && echo foo.o'
Starting 'sleep 2 && echo bar.o'
foo.o
Starting 'sleep 1 && echo doc0.html'
bar.o
Starting 'sleep 1 && echo doc1.html'
doc0.html
Starting 'sleep 1 && echo doc2.html'
doc1.html
Starting 'sleep 1 && echo baz.so'
doc2.html
Starting 'echo toc.html'
toc.html
baz.so
Starting 'echo example-app'
Starting 'echo another-example-app'
example-app
another-example-app

That wraps up the suspending scheduler. A real build system will actually make some of this simpler, since it can use some generic transformations for each task, instead of our piecemeal calls to shared(), clone() etc. It would just make all tasks shared and boxed. Even things like failures fall out relatively easily by having all downstream futures simply resolve with failure instead of running their command. I am actually working on a real build system, but that is for another time.

A topological scheduler

Full code in the Rust playground

Ironically, expressing a topological scheduler using Futures is actually more convoluted than a suspending scheduler. I won’t go into the code piecemeal, but the general idea follows.

A topological sort derives a linear ordering of nodes from a directed, acyclic graph. A serial execution of such a build system is trivial as you just run a task, then proceed to the next task. In comparison, for the suspending scheduler we simply recreated the build graph as a Futures graph and let it run.

The problem happens with the parallelism requirement. The example above does not deal with most of the complexity and is already “not idiomatic”. We are performing capacity analysis on the semaphore ourselves, and scheduling as many futures as we can. Then we wait until one of the tasks is done, after which we try to schedule more tasks. This simple example does not deal with dependencies at all. In a real system, we have to shunt tasks that have dependencies to a separate holding area. Only “leaf” tasks get scheduled first. Then, as tasks finish execution, we check if that unblocked any tasks with dependencies and schedule them. Essentially, we end up taking a lot of the responsibilities that Futures/Promises give us “for free”. In a Futures based universe, we are better of using a suspending scheduler to deal with purely topological builds.

This concludes the build system explanation. Next up we discuss what made the Rust example more complicated than something in, say, JavaScript.

Implementation details

Since Rust’s Futures do not run in a managed environment, we need to do declarative memory management using the ownership system. In addition, due to the static typing and monomorphization vs. dynamic dispatch, sometimes we need to do some “type unification” to make rustc happy.

Use of LocalSet

By default, async tasks are expected to be Send-able so they can run on thread-pools. Since we run everything on one thread and do not need this, we use LocalSet to allow spawning non-Send-able tasks.

join_all vs FuturesUnordered

I actually lied about join_all suspending the returned Future until one of the underlying Futures resolved. It actually continously polls the Futures and does not use the Waker mechanism. FuturesUnordered is a more sophisticated implementation that uses Waker. Using it would have further decreased the readability of the examples. A real implementation should use FuturesUnordered.

Sharing futures

The lack of GC means we have to be explicit when we start having multiple users of a Future or the values returned by Futures. Calling shared() on a Future returns a Future that supports this. It requires that the Future’s result be Cloneable. This is where we jump in with wrapping std::io::Error in Rc. Rc is OK because of LocalSet. Otherwise we would use Arc.

Dynamic dispatch

The littering of boxed_local() calls throughout the code is required to deal with how async blocks are implemented. Each async block desugars to something that returns a Future with a unique type. Since the various functions we use like join_all require that all futures passed to them be of the same type, we need to reconcile this. The easiest way is to boxed() each Future, converting it to a heap-allocated dyn Future and leveraging dynamic-dispatch. join_all no longer has to worry about sizing its own underlying structures based on different inputs. It is just dealing with a pointer to something that satisfies the Future contract. Think of this as going from “impl Future<...>” to “dyn Future<...>”. I use scare quotes because I myself don’t really understand all the type-erasure and pinning requirements that make something like this necessary. boxed_local() is just boxed() without the Send requirement.

Structured concurrency

There is one final thing that we didn’t encounter here, but may affect the final design. In the suspending scheduler we always used async blocks in the block_on loop, but did not spawn independent Tokio tasks using something like local_set.spawn_local(). The topological scheduler does do that. Using spawn() creates tasks that have a 'static lifetime, meaning anything they refer to has to now be owned by the async block or have a 'static lifetime. This can cause us to jump through a lot of hoops (like the use of once_cell). I don’t think avoiding the use of tasks has any performance impact here, but in a Futures system where tasks are CPU-bound, you may need to use it. This 'static requirement is silly here because we can clearly see that the program ends as soon as block_on finishes, but there is no way to tell rustc about that. You may wish to follow the Structured Concurrency RFC to see how the story evolves there.


  1. Under the hood, this is all going to desugar to roughly running epoll() on a set of process file descriptors. ↩︎

  2. You could output the current times as well, if you want to be absolutely sure the link waits until the compiles are done. ↩︎

  3. Analogous to calling fetch in the paper. ↩︎

  4. I was bitten hard by the Rust gotcha that using just _ as an unused variable name does not bind! You need to have some other character as well. ↩︎

  5. The macro has to be moved into the main async block due to macro hygiene rules. ↩︎