Scala Futures vs Monix Tasks

In this previous post we saw how Scala Futures work and why they need an implicit ExecutionContext to run their computation. While there is some trick to pass the ExecutionContext, it’s usually cumbersome and clutter the code.

So the question really is: Do you need an ExecutionContext everywhere? Well, you do as long as you use Futures, but is there any alternatives?

Future’s limitations

Let’s take a classic yet simple example. An application usually needs to persist data into a database or remote service, … and the API looks somehow like this:

def run(query: DBQuery)(implicit executor: ExecutionContext): Future[ResultSet]

which can be used like this

implicit val executor = scala.concurrent.ExecutionContext.global

val result: Future[ResultSet] = db.run(query)

and then this code is used in a Repository which looks like this:

trait ProductRepository {
  def findProduct(
    productId: ProductId
  )(implicit executor: ExecutionContext): Future[Option[Product]]
  
  def saveProduct(
    product: Product
  )(implicit executor: ExecutionContext): Future[Unit]
  
  def incrementProductSells(
    productId: ProductId, 
    quantity: Int
  )(implicit executor: ExecutionContext): Future[Unit]
  
  // More methods ...
}

As you can see passing the ExecutionContext is a quite cumbersome. Alternatively you can consider passing it inside the repository’s constructor so that it’s implicitly available inside all the method bodies.

Side-effects with “non referentially transparent” Futures

Another problem is that Futures are (sometimes) hard to reason about because of their strict nature.
Let’s consider this use case to illustrate the issue: a customer make a purchase and then the application increments the number of sells for each product he just bought.

val incrementResult: Future[Unit] = 
  Future.traverse(basket.content) { product =>
    productRepo.incrementProductSells(product.id, product.quantity)
  }

Let’s assume that one customer bought 4 products and there is a failure when incrementing the second product. Obviously we get a failed Future back in our incrementResult. If we’ve done error reporting correctly we might even be able to know that it’s the increment for the 2nd product that failed. However we can’t decide what to do with this failure: Should we ignore it … or retry it?

Well, there is no good answer. The problem is that all the 4 increments operation were started at the same time. We know that the 2nd product failed but we have no idea of what happened with the other 3 requests.

This is because Scala Futures are strict. I.e. Their computation is executed as soon as the future is created and because of that Futures are not “referentially transparent”, which might cause problems if this it not considered carefully when designing a program.

Same might happen when using for-comprehension if you create the Future before hand:

val futureA = Future { ... }
val futureB = Future { ... }
// at this point both computation are executed
val result = for {
  a <- futureA
  b <- futureB
} yield ()

Now if futureA fails there is no way to know how futureB‘s computation ended (which might be a problem if it performed some side-effects).

On the other hand creating the futures inside the for-comprehension solves this problem.

for {
  a <- Future { ... }
  // only a's computation is executed
  b <- Future { ... }
  // b's computation is executed only if a succeed
} yield ()

This also means that the futures are now executed sequentially so you have to think carefully how you create your futures (and consider what actions need to be taken in case of failure).

Memoization

Another characteristic of Futures is that they remember their results. They run their computation only once and remember the result of the computation so that if they’re call multiple time they can hand-out the results directly.

This is good for performance but this goes against “referential transparency”, which may make the code harder to reason about. Consider the following code:

val total = Future.successful {
  println("Computing 2 + 2")
  2 + 2
}
println(total)
println(total)

The output is

Computing 2 + 2
Future(Success(4))
Future(Success(4))

which shows that the computation is executed only once although the future is used twice. That’s another thing to keep in mind when working with Scala Futures.

Moving away from Futures

Let’s contrast this with a structure that only represent a computation but doesn’t execute it. Something like a lazy Future which takes a computation but doesn’t execute it (until you tell it to).

Well, such structures already exist and are (often) called Task (or IO). The most popular implementations are monix and scalaz. In this post I’ll use the monix implementation but scalaz would work equally well.

Now if we go back to our basic example used in the previous post:

import monix.eval.Task

import scala.concurrent.Await
import scala.concurrent.duration.Duration

object TaskInvestigation extends App {

  val taskA = Task {
    debug("Starting taskA")
    Thread.sleep(1000)
    debug("Finished taskA")
  }

  val taskB = Task {
    debug("Starting taskB")
    Thread.sleep(2000)
    debug("Finished taskB")
  }

  import monix.execution.Scheduler.Implicits.global

  debug("Starting Main")
  val futureA = taskA.runAsync
  val futureB = taskB.runAsync
  debug("Continuing Main")

  Await.result(futureA zip futureB, Duration.Inf)

  def debug(msg: String): Unit = {
    val now = java.time.format.DateTimeFormatter.ISO_INSTANT
      .format(java.time.Instant.now)
      .substring(11, 23)
    val thread = Thread.currentThread.getName()
    println(s"$now [$thread] $msg")
  }

}

It really looks similar to our examples with Futures. Future has been replaced with Task and the global ExecutionContext has been replaced with a global Scheduler (more on that later).

The output is also similar to the implementation based on Futures

10:22:17.727 [main] Starting Main
10:22:18.100 [scala-execution-context-global-10] Starting taskA
10:22:18.103 [scala-execution-context-global-11] Starting taskB
10:22:18.104 [main] Continuing Main
10:22:19.101 [scala-execution-context-global-10] Finished taskA
10:22:20.105 [scala-execution-context-global-11] Finished taskB

Both tasks are started concurrently onto their own thread which run concurrently with the main program and the overall execution time is 2 seconds (the duration of the longest task).

However there are some differences! The main one is that the task are not started when created. Therefore no Scheduler is needed before we actually call runAsync which triggers the computation.

Not needing a Scheduler before we call runAsync means a less-cluttered code. It’s now possible to write the repository API without having to worry about the ExecutionContext.

trait ProductRepository {
  def findProduct(productId: ProductId): Task[Option[Product]]
  def saveProduct(product: Product): Task[Unit]
  def incrementProductSells(productId: ProductId, quantity: Int): Task[Unit]
  // More methods ...
}

That’s great our code looks clear! Moreover it’s also easier to reason about. Remember our problem when incrementing the number of product sells:

val incrementResult: Task[Unit] = 
  Task.traverse(basket.content) {  product =>
    productRepo.incrementProductSells(product.id, product.quantity)
  }
// nothing is executed yet
// we need to explicitly call runAsync to trigger the execution
incrementResult.runAsync

Another good point about that is that tasks are now executed sequentially so if the update for the second product fails we we know exactly where we are: product 1 update was successful, product 2 update failed and product 3 and 4 updates didn’t run at all. We know exactly what we need to retry.

You can argue that sequential execution is much less efficient! Instead we could have performed all the updates at once and reduce the execution time. That’s correct, it’s just that going this way makes it more difficult to combine the results in case of failures as you need to track the result of each operation.

But if you want to go this way, this is possible too. This operation is called Task.gather. It’s always good to have a choice and at least now you know what are the consequences of going one way or the other.

Memoization

By default Tasks don’t remember their results. They execute their computation every time they needed it. Using the same example

val total = Task {
  println("Computing 2 + 2")
  2 + 2
}
println(total)
println(total)

outputs

Computing 2 + 2
4
Computing 2 + 2
4

This clearly shows that the computation is run twice because the task is used twice. But if you want your task to remember its result you have to state it explicitly with the memoize method.

val rememberTotal = total.memoize

Running on the current thread

Scala Future exposes a Future.successful to execute a computation on the current thread. Monix Tasks offer something similar in 2 different flavours:

  • Task.pure or Task.now takes a “strict” value and wrap it into a Task. If the value requires a computation it is executed strait away on the current thread (just like Future.successful does).
  • Task.eval is the “lazy” version of it. The computation or value is not evaluated strait away but wrapped into a Task that will be executed when you tell it to. (e.g. when you call the runAsync method on it)

Going back to our basic example if we wrap the tasks into Task.eval we can see that the program is now executed sequentially on the main thread

13:40:04.018 [main] Starting Main
13:40:04.379 [main] Starting taskA
13:40:05.381 [main] Finished taskA
13:40:05.387 [main] Starting taskB
13:40:06.390 [main] Finished taskB
13:40:06.391 [main] Continuing Main

The Tasks implementation

Now let’s have a look at what is a Task under the hood. Well it’s just one of the following case classes:

sealed abstract class Task[+A] extends Serializable
private[eval] final case class Now[A](value: A) extends Task[A]
private[eval] final case class Error[A](ex: Throwable) extends Task[A]
private[eval] final case class Eval[A](thunk: () => A) extends Task[A]
private[eval] final case class Suspend[+A](thunk: () => Task[A]) extends Task[A]
private[eval] final case class FlatMap[A, B](source: Task[A], f: A => Task[B]) extends Task[B]
private[eval] final case class Async[+A](onFinish: OnFinish[A]) extends Task[A]
private[eval] final class MemoizeSuspend[A](
  f: () => Task[A],
  private[eval] val cacheErrors: Boolean) extends Task[A]
  • Now holds a result value.
  • Error hold a result error (an exception)
  • Eval represents an immediate synchronous value
  • Suspend allow to execute computation in a stask-safe-way by suspending the current computation and returning to the execution loop.
  • FlatMap allow to compose Tasks together. It allows to create a task that depends on the result of a previous task.(Remember our Free monad implementation? Doesn’t it look familiar?)
  • Async represents a task that needs to be run asynchronously on a “different” thread (it actually depends on the underlying execution context).
  • Finally MemoizeSuspend is just an Async that remembers (memoizes) its value using an AtomicRef (The Atomic implementation is quite interesting on its own. It’s a macro that generates the right CAS instance – AtomicInt, AtomicRef, … depending on the type of the value. It also wraps the CAS calls into an appropriate while loop … More information here)

The Task ADT allows to precisely model the computation graph of a program indicating the dependencies between them, the async boundaries, … plus it’s easy to compose.

For example if we look at Task.apply we can see that it’s implemented in as

object Task {
  // ...
  def apply[A](f: => A): Task[A] = fork(eval(f))
  def def eval[A](a: => A): Task[A] = Eval(a _)
  def fork[A](fa: Task[A]): Task[A] = Task.forkedUnit.flatMap(_ => fa)
  // ... 
  private final val forkedUnit = Async[Unit] { (context, cb) =>
    context.scheduler.executeAsync(() => cb.onSuccess(()))
  }
}

eval just wrapped f into an Eval.
fork is quite interesting. It takes a forkedUnit which is just an Async[Unit] and flatMap over it with the Eval task.

Using an Async instance places an asynchronous boundary indicating that this computation needs to be executed onto a different thread.

Schedulers

Just like Futures need an ExecutionContext, Tasks need a Scheduler to run their computation. It’s not surprising that Scheduler actually extends ExecutionContext.

However having all computations wrapped into different Tasks instances allows for much finer control over the execution. We can choose which computations are executed on the current thread and which are on another thread (depending on the selected Scheduler).

The implementation is more complex than for the Scala Future execution. The main idea is that we have a tail-recursive function called loop that pattern-matches on the Tasks case classes and triggers the associated computations either on the local thread or using a Scala Promise whenever an asynchronous boundary must be crossed (with the goAsync method).

Even when an async boundary must be crossed it might still run on the current thread depending on the Scheduler and the ExecutionModel passed in.

The ExecutionModel maintains a counter of stack frames so that it forces an async boundary every time the selected stack size is exceeded (An async boundary is forced overtime the counter rolls back to 0).

The main execution models are:

  • SynchronousExecution: counter is never incremented (always 1) so no async boundary crossed
  • AlwaysAsyncExceution: counter is never incremented (always 0) so an async boundary is always forced
  • BatchedExecution: counter is incremented and takes a batchSize argument which can be configured with the property monix.environment.batchSize or defaults to 1024

The “default” scheduler is an instance of the AsyncScheduler which provides trampoline capabilities (i.e. execute computation on the current thread in a stack-safe way). There is also a bunch of constructor methods to create your own Scheduler backed by specific thread pools. These schedulers are instances of ExecutorScheduler and backed by a Java ExecutorService.

You can find all these constructors in the SchedulerCompanionImpl.

Bridge between Futures and Tasks

Tasks and Futures are quite close and it’s quite easy to convert from one to the other.

You can convert a Future to a Task by using some constructor methods:

val taskA = Task.fromFuture(myFuture)

// if you don't want to execute the future right now
val taskB = Task.deferFuture(Future { ... })
// which is that same as
val taskC = Task.defer(Task.fromFuture(Future { ... }))

To convert a Task into a Future you just need to run that task by calling runAsync and providing an implicit scheduler.

Interestingly it’s possible to use different Scheduler for different step of a computation. In such cases the Scheduler must be passed in when building the computation using the fork method.

val task = Task(
  println(s"Running on thread: ${Thread.currentThread.getName}")
)
val forked = Task.fork(task, Scheduler.io(name="my-io"))
// runs on the global scheduler
task.runAsync 
// forked is started on the global scheduler
// but it runs `task` using a specific IO scheduler
forked.runAsync

This is especially useful if you want to use a dedicated thread-pool for your database calls, …

Conclusion

At first sight a Task might seem less powerful than a Future as it cannot execute a computation directly like Futures do. On the other hand it gives more control to the user as this one is now able to choose precisely when to trigger the computation and what kind of execution context should be used.

This is called the principle of least power: “Always choose the least powerful feature” (that is able to do the job).
There is a great talk on this subject by Runar Bjarnason: Contraints liberate, liberties constrain.