Understanding Scala Futures and Execution Contexts

Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Share on Reddit

Viktor Klang recently published a set of useful tips on Scala Futures. While being widespread and heavily used, people (especially newcomers) are still experiencing problems working with Scala Futures.

In my opinion many of the problems come from a misunderstanding or misconception on how Futures work. (e.g. the strict nature of Futures and the way they interact with Execution Contexts to name a few).

That’s enough of an excuse to dive into the Scala Future and ExecutionContext implementation.

Basic example

Lets take a basic example to support our investigation:

object Prog {
  def taskA(): Unit = {
    debug("Starting taskA")
    Thread.sleep(1000) // wait 1secs
    debug("Finished taskA")
  }

  def taskB(): Unit = {
    debug("Starting taskB")
    Thread.sleep(2000) // wait 2secs
    debug("Finished taskB")
  }

  def main(args: Array[String]): Unit = {
    debug("Starting Main")
    taskA()
    taskB()
    debug("Finished Main")
  }

  def debug(message: String): Unit = {
    val now = java.time.format.DateTimeFormatter.ISO_INSTANT
      .format(java.time.Instant.now)
      .substring(11, 23) // keep only time component
    val thread = Thread.currentThread.getName()
    println(s"$now [$thread] $message")
  }
}

This is a very basic code. The main program execute 2 tasks. Each tasks does nothing but wait a few seconds to simulate some execution (e.g. fetching something from a database or over the network, writing to a file, …).

Unsuprisingly the output is the following:

08:25:42.958 [run-main-0] Starting Main
08:25:42.958 [run-main-0] Starting taskA
08:25:43.960 [run-main-0] Finished taskA
08:25:43.960 [run-main-0] Starting taskB
08:25:45.961 [run-main-0] Finished taskB
08:25:45.961 [run-main-0] Finished Main

Both tasks are executed sequentially on the same thread as the main program. The first tasks takes one second to complete and the second task takes 2. No doubt that we are wasting resources as the main thread should be able to continue execution while the tasks are executed. Also assuming no dependencies between the tasks we should be able to run them concurrently.

Asynchronous computation

In order to do it we need an asynchronous computation and in Scala this is typically done by using a Future. So let’s just wrap our tasks into a Future.

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object Prog {
  def taskA(): Future[Unit] = Future {
    debug("Starting taskA")
    Thread.sleep(1000) // wait 1secs
    debug("Finished taskA")
  }

  def taskB(): Future[Unit] = Future {
    debug("Starting taskB")
    Thread.sleep(2000) // wait 2secs
    debug("Finished taskB")
  }

  def main(args: Array[String]): Unit = {
    debug("Starting Main")
    val futureA = taskA()
    val futureB = taskB()
    debug("Continuing Main")
    // wait for both future to complete before exiting
    Await.result(futureA zip futureB, Duration.Inf)
  }
  // ...
}

Basically a Future is just a placeholder for something that doesn’t exist yet. Here our 2 tasks return a Future[Unit], that is a placeholder that will contain Unit when the task completes. The placeholder (the Future) is returned straight away, before the task is actually executed.

Setting up an execution context

However if we try to run this program we get a compilation error:

[error] Prog.scala:9: Cannot find an implicit ExecutionContext. You might pass
[error] an (implicit ec: ExecutionContext) parameter to your method
[error] or import scala.concurrent.ExecutionContext.Implicits.global.
[error]   def task1(): Future[Unit] = Future {
[error] 

As you can see we need an ExecutionContext in order to be able to use a Future. Don’t worry we’re going to see what is an ExcecutionContext in detail but for now let’s just use the global execution context as suggested by the compiler.

There are 3 ways to ways to use the global execution context:

  • Importing the global execution context:
    import scala.concurrent.ExecutionContext.Implicits.global
  • Using an implicit variable:
    implicit val executor =  scala.concurrent.ExecutionContext.global
  • Passing explicitly the execution context
    Future { /* do something */ }(executor)

Let’s import the global execution context

import scala.concurrent.ExecutionContext.Implicits.global

and run our program again:

08:58:09.800 [run-main-1] Starting Main
08:58:09.823 [scala-execution-context-global-121] Starting taskA
08:58:09.824 [run-main-1] Continuing Main
08:58:09.824 [scala-execution-context-global-122] Starting taskB
08:58:10.828 [scala-execution-context-global-121] Finished taskA
08:58:11.827 [scala-execution-context-global-122] Finished taskB

There is a bunch of things worth noting here!

  • Each task is run on its own thread
  • Both tasks are starting at the same time and run concourrently
  • The main program continues before the tasks completes
  • The whole program took only 2s to execute (the duration of the longest task)

I guess there is nothing surprising here for people accustomed to concurrent programming.

Let’s stop for a while and try to understand what happens behind the scene when we create a Future and why we need an ExecutionContext.

The implementation of Future

Future { /* do something */ }

is actually calling the method apply on the Future companion object:

object Future {
  def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] =
    unit.map(_ => body)
}

So it takes a by-name parameter body and an implicit ExecutionContext. Then it calls map on Future.unit.

Already completed Futures

Future.unit is just a Future.successful(()) (a successful future containing Unit) and Future.successful is defined as

def successful[T](result: T): Future[T] = Promise.successful(result).future

It’s a Future of a successful Promise. But what is a Promise? Well it’s something that gives a Future, obviously! A Future is a placeholder for a result that can be read when available. A Promise is like a “writable-once” container that can be used to complete a Future with the written value (“writable-once” is the reason why DefaultPromise extends AtomicReference).

In our case it’s an already kept promised (because we already know the value to use to complete the promise).

object Promise {
  def successful[T](result: T): Promise[T] = fromTry(Success(result))

  def fromTry[T](result: Try[T]): Promise[T] = impl.Promise.KeptPromise[T](result)
}

All futures constructed from an already known value (Futures constructed from a Try including Future.successful(), Future.failed() and Future.fromTry()) are implemented as a Kept promise (i.e. an already completed promise). It means that there is no computation executed in theses cases. The available value is just wrapped in the corresponding Future implementation using the current thread.

If you find yourself in such a situation remember to use one of these 3 constructors as it avoids unnecessary thread context switches.

We’re almost there we need to look at KeptPromise.apply.

object KeptPromise {
  def apply[T](result: Try[T]): scala.concurrent.Promise[T] =
    resolveTry(result) match {
      case s @ Success(_) => new Successful(s)
      case f @ Failure(_) => new Failed(f)
    }
}

You can have a look at the Successful class but we have now a completed promise which holds the value ().

As the value to complete the Future was already known from the start there is no computation to execute so no need for an execution context so far.

Now going back to Future.apply the next operation to look at is map.

Remember we’re mapping over Future.unit and map is defined as

trait Future[+T] extends Awaitable[T] {
  def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = transform(_ map f)

  def transform[S](f: Try[T] => Try[S])(implicit executor: ExecutionContext): Future[S]
}

So map is defined in terms of transform. And transform takes a function Try[T] => Try[S]. It can transform a successful future into a failed future and vice versa. It’s more general than map which can only transform a successful future into another successful future. (In fact map, recover and failed.map are all implemented in terms of transform).

However transform is not implemented directly on the Future trait. In our case we are calling transform on Future.unit which is actually a Kept Promise and transform is implemented in its superclass scala.concurrent.impl.Promise.

private[concurrent] trait Promise[T] 
  extends scala.concurrent.Promise[T] 
  with scala.concurrent.Future[T] {
  override def transform[S](
    f: Try[T] => Try[S]
  )(implicit executor: ExecutionContext): Future[S] = {
    val p = new DefaultPromise[S]()
    onComplete { result =>
     p.complete(try f(result) catch { case NonFatal(t) => Failure(t) }) 
    }
    p.future
  }
}

Here we create a new Promise. However this time it’s not a Kept Promise but a DefaultPromise, a promise that is not already fulfilled. Then onComplete is called with a callback that completes p, the DefaultPromise, by applying the transformation f to the result of the current future.

Remember that we are calling transform on Future.unit which is a Kept promise and this is where onComplete is defined.

private[this] sealed trait Kept[T] extends Promise[T] {
  override def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
    (new CallbackRunnable(executor.prepare(), func)).executeWithValue(result)
}

Since the promise is already completed we just execute func on a new CallbackRunnable which is defined as

private final class CallbackRunnable[T](
  val executor: ExecutionContext, 
  val onComplete: Try[T] => Any
) extends Runnable with OnCompleteRunnable {
  // must be filled in before running it
  var value: Try[T] = null

  override def run() = {
    require(value ne null) // must set value to non-null before running!
    try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
  }

  def executeWithValue(v: Try[T]): Unit = {
    require(value eq null) // can't complete it twice
    value = v
    // Note that we cannot prepare the ExecutionContext at this point, since we might
    // already be running on a different thread!
    try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t }
  }
}

The key here is to note that executor.execute(this) actually invokes run() which in turns execute the onComplete callback.

As you can see ExecutionContext has an execute methods which takes a Runnable.

trait ExecutionContext {
  def execute(runnable: Runnable): Unit
  // ...
}

In fact it’s no coincidence that it’s very similar to a Java Executor:

public interface Executor {
  void execute(Runnable commande);
}

By now it should be clear how the callbacks are now executed onto a different thread (depending on the ExecutionContext passed in). That’s why using Future { } to construct a Future from a value already available is an anti-pattern.

But before we jump onto the execution context details let’s keep focused on the implementation of Futures and Promises as they are still a few important things to cover.

DefaultPromises as a state machine

DefaultPromise[T] implements both Future[T] and AtomicReference[AnyRef](Nil).

Extending AtomicReference is a clever trick because a Promise can be seen as a state-machine moving from the incomplete state to the complete (or fulfilled) state. Using an AtomicReference makes sure there is only one possible transition from incomplete to complete.

@tailrec
private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = {
  get() match {
    case raw: List[_] =>
      val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
        if (compareAndSet(cur, v)) cur else tryCompleteAndGetListeners(v)
    case dp: DefaultPromise[_] => compressedRoot(dp).tryCompleteAndGetListeners(v)
    case _ => null
  }
}

Initially the atomic reference is the empty list Nil and tryCompleteAndGetListeners can change the atomic reference from List[_] to Try[T] only once.

If the atomic reference is a Try we know the promise is fulfilled otherwise it is still incomplete. Moreover using an atomic reference allows to add more callbacks in a thread-safe and non-blocking way.

There is a third state worth noting. This is when the promise is link to another promise which is particularly useful in case of chained promises.

For instance we have seen how a call to transform creates a new Promise. By doing so we can end up with a chain of promises. This is not really a problem for transform because as soon as the root promise completes so does all of the promises down the chain.

However for flatMap (which is implemented using transformWith) things are more complicated. Like transform, transformWith creates a new Promise p. But this time the callback also returns a Future which might as well be a DefaultPromise. This new Promise can only complete when the promise p does. Therefore it’s safe to “link” this promise to p and add all of its callbacks to the callbacks of p. It’s like if the whole promise chain gets “collapsed” onto the root promise. This is a very useful thing to do as it may avoid some memory links (more details here)

Execution contexts

Similarly to the Java Executor the Scala ExecutionContext allows to separate the business logic (i.e. what the code does) from the execution logic (i.e. how the code is executed).

As a consequence one cannot just import the global execution context and get away with that. Instead we need to understand which execution context is needed and why.

The global execution context

As the global execution context is the default one and the most easy one to setup let’s have a look at what it actually is.

First thing to note is that the ExecutionContext.global and the ExecutionContext.Implicits.global are actually the exact same thing:

object ExecutionContext {
  def global: ExecutionContextExecutor = 
    Implicits.global.asInstanceOf[ExecutionContextExecutor]

  object Implicits {
    implicit lazy val global: ExecutionContext = 
      impl.ExecutionContextImpl.fromExecutor(null: Executor)
  }
}

Then it’s calling the fromExecutor constructor but doesn’t provide any executor:

private[concurrent] object ExecutionContextImpl {
  def fromExecutorService(
    es: ExecutorService, 
    reporter: Throwable => Unit = ExecutionContext.defaultReporter
  ): ExecutionContextImpl with ExecutionContextExecutorService = {
  new ExecutionContextImpl(
    Option(es).getOrElse(createDefaultExecutorService(reporter)), 
    reporter
  ) with ExecutionContextExecutorService { 
    // ...
  }
}

If the executor service is not specified it creates a default executor service that is backed by a Java ForkJoinPool.

If you look at the implementation of

def createDefaultExecutorService(reporter: Throwable => Unit): ExecutorService

you’ll notice that there is a bunch of properties used to configure the parallelism of the ForkJoinPool.

Let’s get back to out example to see how these parameters influence the program execution. But first let’s modify our program slightly so that it spawns more tasks:

import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}

object Prog {
  def startTask(number: Int): Future[Unit] = Future {
    debug(s"Starting task#$number")
    Thread.sleep(2000) // wait 2secs
    debug(s"Finished task#$number")
  }

  def main(args: Array[String]): Unit = {
    debug("Starting Main")
    val tasks = Future.traverse(1 to 20)(startTask)
    debug("Continuing Main")
    // waits for all tasks to complete before exiting
    Await.result(tasks, Duration.Inf)
  }

  // ...
}

So now we are starting a bunch of 20 tasks at once. When executed it produces the following output:

15:39:26.055 [run-main-2] Starting Main
15:39:26.062 [scala-execution-context-global-136] Starting task#1
15:39:26.065 [scala-execution-context-global-137] Starting task#2
15:39:26.065 [scala-execution-context-global-139] Starting task#4
15:39:26.066 [scala-execution-context-global-138] Starting task#3
15:39:26.066 [scala-execution-context-global-140] Starting task#5
15:39:26.066 [scala-execution-context-global-141] Starting task#6
15:39:26.066 [scala-execution-context-global-142] Starting task#7
15:39:26.067 [scala-execution-context-global-143] Starting task#8
15:39:26.067 [run-main-2] Continuing Main
15:39:28.065 [scala-execution-context-global-136] Finished task#1
15:39:28.066 [scala-execution-context-global-136] Starting task#9
15:39:28.070 [scala-execution-context-global-139] Finished task#4
15:39:28.070 [scala-execution-context-global-138] Finished task#3
15:39:28.070 [scala-execution-context-global-137] Finished task#2
15:39:28.070 [scala-execution-context-global-142] Finished task#7
15:39:28.070 [scala-execution-context-global-138] Starting task#11
15:39:28.070 [scala-execution-context-global-143] Finished task#8
15:39:28.070 [scala-execution-context-global-141] Finished task#6
15:39:28.070 [scala-execution-context-global-140] Finished task#5
15:39:28.070 [scala-execution-context-global-142] Starting task#12
15:39:28.070 [scala-execution-context-global-141] Starting task#14
15:39:28.070 [scala-execution-context-global-143] Starting task#13
15:39:28.070 [scala-execution-context-global-139] Starting task#10
15:39:28.070 [scala-execution-context-global-140] Starting task#15
15:39:28.071 [scala-execution-context-global-137] Starting task#16
15:39:30.070 [scala-execution-context-global-136] Finished task#9
15:39:30.070 [scala-execution-context-global-136] Starting task#17
15:39:30.073 [scala-execution-context-global-138] Finished task#11
15:39:30.073 [scala-execution-context-global-138] Starting task#18
15:39:30.073 [scala-execution-context-global-143] Finished task#13
15:39:30.073 [scala-execution-context-global-143] Starting task#19
15:39:30.073 [scala-execution-context-global-140] Finished task#15
15:39:30.074 [scala-execution-context-global-140] Starting task#20
15:39:30.073 [scala-execution-context-global-141] Finished task#14
15:39:30.074 [scala-execution-context-global-139] Finished task#10
15:39:30.073 [scala-execution-context-global-142] Finished task#12
15:39:30.075 [scala-execution-context-global-137] Finished task#16
15:39:32.073 [scala-execution-context-global-136] Finished task#17
15:39:32.075 [scala-execution-context-global-138] Finished task#18
15:39:32.076 [scala-execution-context-global-140] Finished task#20
15:39:32.074 [scala-execution-context-global-143] Finished task#19

It started 8 tasks at once, then waited for these tasks to complete and started new tasks as the old ones finish. Overall it took 6 seconds to complete 20 tasks (each of them sleeping 2 seconds). That’s because we needed 3 rows to complete all the tasks (8 tasks completed in 2s, then another 8 tasks completed in 2s, then the remaining 4 tasks completed in 2s).

Looks OK, but why 8 threads are used and not 4 or 16, or anything else. Well the answer is in the code of the createDefaultExecutorService:

def createDefaultExecutorService(reporter: Throwable => Unit): ExecutorService = {
  def getInt(name: String, default: String) = (
    try System.getProperty(name, default) catch {
      case e: SecurityException => default
    }
  ) match {
    case s if s.charAt(0) == 'x' => 
      (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
    case other => other.toInt
  }
  // ...
  val maxNoOfThreads = getInt("scala.concurrent.context.maxThreads", "x1")
  // ...
}

When the property scala.concurrent.context.maxThreads is not set it defaults to the number of available cores on the system. In my case it’s 8. It might be different on your machine. More importantly it means that it’s quite likely that the settings are going to be different between the developper environment and the production environment. Something worth to keep in mind in case of performance issues …

Now let’s see what happens if we change the parallelism settings. For instance let’s set this property to 1 in the build.sbt:

fork in run := true

javaOptions += "-Dscala.concurrent.context.maxThreads=1"

Now the tasks are executed one by one:

[info] 15:54:55.445 [main] Starting Main
[info] 15:54:56.084 [scala-execution-context-global-10] Starting task#1
[info] 15:54:56.128 [main] Continuing Main
[info] 15:54:58.088 [scala-execution-context-global-10] Finished task#1
[info] 15:54:58.089 [scala-execution-context-global-10] Starting task#2
[info] 15:55:00.093 [scala-execution-context-global-10] Finished task#2
[info] 15:55:00.094 [scala-execution-context-global-10] Starting task#3
[info] 15:55:02.099 [scala-execution-context-global-10] Finished task#3
[info] 15:55:02.099 [scala-execution-context-global-10] Starting task#4
[info] 15:55:04.104 [scala-execution-context-global-10] Finished task#4
...

and it takes 40s to execute all of them.

Blocking

Let’s add one more modification to out program and see what’s happening

def startTask(number: Int): Future[Unit] = Future {
  blocking {
    debug(s"Starting task#$number")
    Thread.sleep(2000) // wait 2secs
    debug(s"Finished task#$number")
  }
}

All we have done is wrap our computation into a scala.concurrent.blocking call. Now if we run our program again it starts all the tasks at once and completes in only 2s.

That’s because the global execution context is “blocking” aware and starts extra threads for blocking computations.
This setting is controlled by the property scala.concurrent.context.maxExtraThreads which defaults to 256.

However it’s only useful for IO-bound tasks where the threads are left idle waiting for a resource to be available. It’s useless for CPU-bound task where the threads is kept busy.

Note that not all ExecutionContext implements the BlockingContext trait. For such ExecutionContext wrapping computation into a blocking call has no effects.

Finally as the global ExecutionContext is backed by a ForkJoinPool it makes it more suited for CPU-bound tasks.

Other type of ExecutionContexts

Having a fromExecutor constructor for the ExecutionContext means that it’s quite easy to create an ExecutionContext from any of the Java executor service like FixedThreadPoolExecutor, SingleThreadPoolExecutor, CachedThreadPoolExecutor, …).

Execution contexts are not limited to the Java world, there are many more Execution context available: Monix schedulers and Akka dispatchers also implement the ExecutionContext.

Conclusion

This has been quite an involved journey (according to the length of this post) but by now it should be clear what are the implication of using Future and choosing an ExecutionContext.

As a summary here are the main pitfalls to avoid:

  • Using the global ExecutionContext for non-cpu bound tasks (e.g. Prefer a dedicated ThreadPool for IO-bound tasks)
  • Not using “blocking” construct (Always useful, even for “documenting” the code)
  • Not knowing the parallelism settings
  • Assuming blocking works with any ExecutionContext