In this previous post we saw how Scala Futures work and why they need an implicit ExecutionContext to run their computation. While there is some trick to pass the ExecutionContext, it’s usually cumbersome and clutter the code.
So the question really is: Do you need an ExecutionContext everywhere? Well, you do as long as you use Futures, but is there any alternatives?
Future’s limitations
Let’s take a classic yet simple example. An application usually needs to persist data into a database or remote service, … and the API looks somehow like this:
def run(query: DBQuery)(implicit executor: ExecutionContext): Future[ResultSet]
which can be used like this
implicit val executor = scala.concurrent.ExecutionContext.global val result: Future[ResultSet] = db.run(query)
and then this code is used in a Repository
which looks like this:
trait ProductRepository { def findProduct( productId: ProductId )(implicit executor: ExecutionContext): Future[Option[Product]] def saveProduct( product: Product )(implicit executor: ExecutionContext): Future[Unit] def incrementProductSells( productId: ProductId, quantity: Int )(implicit executor: ExecutionContext): Future[Unit] // More methods ... }
As you can see passing the ExecutionContext
is a quite cumbersome. Alternatively you can consider passing it inside the repository’s constructor so that it’s implicitly available inside all the method bodies.
Side-effects with “non referentially transparent” Futures
Another problem is that Futures are (sometimes) hard to reason about because of their strict nature.
Let’s consider this use case to illustrate the issue: a customer make a purchase and then the application increments the number of sells for each product he just bought.
val incrementResult: Future[Unit] = Future.traverse(basket.content) { product => productRepo.incrementProductSells(product.id, product.quantity) }
Let’s assume that one customer bought 4 products and there is a failure when incrementing the second product. Obviously we get a failed Future back in our incrementResult
. If we’ve done error reporting correctly we might even be able to know that it’s the increment for the 2nd product that failed. However we can’t decide what to do with this failure: Should we ignore it … or retry it?
Well, there is no good answer. The problem is that all the 4 increments operation were started at the same time. We know that the 2nd product failed but we have no idea of what happened with the other 3 requests.
This is because Scala Futures are strict. I.e. Their computation is executed as soon as the future is created and because of that Futures are not “referentially transparent”, which might cause problems if this it not considered carefully when designing a program.
Same might happen when using for-comprehension if you create the Future before hand:
val futureA = Future { ... } val futureB = Future { ... } // at this point both computation are executed val result = for { a <- futureA b <- futureB } yield ()
Now if futureA
fails there is no way to know how futureB‘s computation ended (which might be a problem if it performed some side-effects).
On the other hand creating the futures inside the for-comprehension solves this problem.
for { a <- Future { ... } // only a's computation is executed b <- Future { ... } // b's computation is executed only if a succeed } yield ()
This also means that the futures are now executed sequentially so you have to think carefully how you create your futures (and consider what actions need to be taken in case of failure).
Memoization
Another characteristic of Futures is that they remember their results. They run their computation only once and remember the result of the computation so that if they’re call multiple time they can hand-out the results directly.
This is good for performance but this goes against “referential transparency”, which may make the code harder to reason about. Consider the following code:
val total = Future.successful { println("Computing 2 + 2") 2 + 2 } println(total) println(total)
The output is
Computing 2 + 2 Future(Success(4)) Future(Success(4))
which shows that the computation is executed only once although the future is used twice. That’s another thing to keep in mind when working with Scala Futures.
Moving away from Futures
Let’s contrast this with a structure that only represent a computation but doesn’t execute it. Something like a lazy Future which takes a computation but doesn’t execute it (until you tell it to).
Well, such structures already exist and are (often) called Task
(or IO
). The most popular implementations are monix and scalaz. In this post I’ll use the monix implementation but scalaz would work equally well.
Now if we go back to our basic example used in the previous post:
import monix.eval.Task import scala.concurrent.Await import scala.concurrent.duration.Duration object TaskInvestigation extends App { val taskA = Task { debug("Starting taskA") Thread.sleep(1000) debug("Finished taskA") } val taskB = Task { debug("Starting taskB") Thread.sleep(2000) debug("Finished taskB") } import monix.execution.Scheduler.Implicits.global debug("Starting Main") val futureA = taskA.runAsync val futureB = taskB.runAsync debug("Continuing Main") Await.result(futureA zip futureB, Duration.Inf) def debug(msg: String): Unit = { val now = java.time.format.DateTimeFormatter.ISO_INSTANT .format(java.time.Instant.now) .substring(11, 23) val thread = Thread.currentThread.getName() println(s"$now [$thread] $msg") } }
It really looks similar to our examples with Futures. Future
has been replaced with Task
and the global ExecutionContext
has been replaced with a global Scheduler
(more on that later).
The output is also similar to the implementation based on Futures
10:22:17.727 [main] Starting Main 10:22:18.100 [scala-execution-context-global-10] Starting taskA 10:22:18.103 [scala-execution-context-global-11] Starting taskB 10:22:18.104 [main] Continuing Main 10:22:19.101 [scala-execution-context-global-10] Finished taskA 10:22:20.105 [scala-execution-context-global-11] Finished taskB
Both tasks are started concurrently onto their own thread which run concurrently with the main program and the overall execution time is 2 seconds (the duration of the longest task).
However there are some differences! The main one is that the task are not started when created. Therefore no Scheduler
is needed before we actually call runAsync
which triggers the computation.
Not needing a Scheduler before we call runAsync
means a less-cluttered code. It’s now possible to write the repository API without having to worry about the ExecutionContext
.
trait ProductRepository { def findProduct(productId: ProductId): Task[Option[Product]] def saveProduct(product: Product): Task[Unit] def incrementProductSells(productId: ProductId, quantity: Int): Task[Unit] // More methods ... }
That’s great our code looks clear! Moreover it’s also easier to reason about. Remember our problem when incrementing the number of product sells:
val incrementResult: Task[Unit] = Task.traverse(basket.content) { product => productRepo.incrementProductSells(product.id, product.quantity) } // nothing is executed yet // we need to explicitly call runAsync to trigger the execution incrementResult.runAsync
Another good point about that is that tasks are now executed sequentially so if the update for the second product fails we we know exactly where we are: product 1 update was successful, product 2 update failed and product 3 and 4 updates didn’t run at all. We know exactly what we need to retry.
You can argue that sequential execution is much less efficient! Instead we could have performed all the updates at once and reduce the execution time. That’s correct, it’s just that going this way makes it more difficult to combine the results in case of failures as you need to track the result of each operation.
But if you want to go this way, this is possible too. This operation is called Task.gather
. It’s always good to have a choice and at least now you know what are the consequences of going one way or the other.
Memoization
By default Tasks don’t remember their results. They execute their computation every time they needed it. Using the same example
val total = Task { println("Computing 2 + 2") 2 + 2 } println(total) println(total)
outputs
Computing 2 + 2 4 Computing 2 + 2 4
This clearly shows that the computation is run twice because the task is used twice. But if you want your task to remember its result you have to state it explicitly with the memoize
method.
val rememberTotal = total.memoize
Running on the current thread
Scala Future exposes a Future.successful
to execute a computation on the current thread. Monix Tasks offer something similar in 2 different flavours:
Task.pure
orTask.now
takes a “strict” value and wrap it into aTask
. If the value requires a computation it is executed strait away on the current thread (just likeFuture.successful
does).Task.eval
is the “lazy” version of it. The computation or value is not evaluated strait away but wrapped into aTask
that will be executed when you tell it to. (e.g. when you call therunAsync
method on it)
Going back to our basic example if we wrap the tasks into Task.eval
we can see that the program is now executed sequentially on the main thread
13:40:04.018 [main] Starting Main 13:40:04.379 [main] Starting taskA 13:40:05.381 [main] Finished taskA 13:40:05.387 [main] Starting taskB 13:40:06.390 [main] Finished taskB 13:40:06.391 [main] Continuing Main
The Tasks implementation
Now let’s have a look at what is a Task
under the hood. Well it’s just one of the following case classes:
sealed abstract class Task[+A] extends Serializable private[eval] final case class Now[A](value: A) extends Task[A] private[eval] final case class Error[A](ex: Throwable) extends Task[A] private[eval] final case class Eval[A](thunk: () => A) extends Task[A] private[eval] final case class Suspend[+A](thunk: () => Task[A]) extends Task[A] private[eval] final case class FlatMap[A, B](source: Task[A], f: A => Task[B]) extends Task[B] private[eval] final case class Async[+A](onFinish: OnFinish[A]) extends Task[A] private[eval] final class MemoizeSuspend[A]( f: () => Task[A], private[eval] val cacheErrors: Boolean) extends Task[A]
Now
holds a result value.Error
hold a result error (an exception)Eval
represents an immediate synchronous valueSuspend
allow to execute computation in a stask-safe-way by suspending the current computation and returning to the execution loop.FlatMap
allow to compose Tasks together. It allows to create a task that depends on the result of a previous task.(Remember our Free monad implementation? Doesn’t it look familiar?)Async
represents a task that needs to be run asynchronously on a “different” thread (it actually depends on the underlying execution context).- Finally
MemoizeSuspend
is just anAsync
that remembers (memoizes) its value using anAtomicRef
(TheAtomic
implementation is quite interesting on its own. It’s a macro that generates the right CAS instance –AtomicInt
,AtomicRef
, … depending on the type of the value. It also wraps the CAS calls into an appropriate while loop … More information here)
The Task ADT allows to precisely model the computation graph of a program indicating the dependencies between them, the async boundaries, … plus it’s easy to compose.
For example if we look at Task.apply
we can see that it’s implemented in as
object Task { // ... def apply[A](f: => A): Task[A] = fork(eval(f)) def def eval[A](a: => A): Task[A] = Eval(a _) def fork[A](fa: Task[A]): Task[A] = Task.forkedUnit.flatMap(_ => fa) // ... private final val forkedUnit = Async[Unit] { (context, cb) => context.scheduler.executeAsync(() => cb.onSuccess(())) } }
eval
just wrapped f
into an Eval
.
fork
is quite interesting. It takes a forkedUnit
which is just an Async[Unit]
and flatMap over it with the Eval
task.
Using an Async
instance places an asynchronous boundary indicating that this computation needs to be executed onto a different thread.
Schedulers
Just like Futures
need an ExecutionContext
, Tasks
need a Scheduler
to run their computation. It’s not surprising that Scheduler
actually extends ExecutionContext
.
However having all computations wrapped into different Tasks
instances allows for much finer control over the execution. We can choose which computations are executed on the current thread and which are on another thread (depending on the selected Scheduler
).
The implementation is more complex than for the Scala Future execution. The main idea is that we have a tail-recursive function called loop
that pattern-matches on the Tasks
case classes and triggers the associated computations either on the local thread or using a Scala Promise whenever an asynchronous boundary must be crossed (with the goAsync
method).
Even when an async boundary must be crossed it might still run on the current thread depending on the Scheduler and the ExecutionModel
passed in.
The ExecutionModel
maintains a counter of stack frames so that it forces an async boundary every time the selected stack size is exceeded (An async boundary is forced overtime the counter rolls back to 0).
The main execution models are:
SynchronousExecution
: counter is never incremented (always1
) so no async boundary crossedAlwaysAsyncExceution
: counter is never incremented (always0
) so an async boundary is always forcedBatchedExecution
: counter is incremented and takes abatchSize
argument which can be configured with the propertymonix.environment.batchSize
or defaults to1024
The “default” scheduler is an instance of the AsyncScheduler
which provides trampoline capabilities (i.e. execute computation on the current thread in a stack-safe way). There is also a bunch of constructor methods to create your own Scheduler backed by specific thread pools. These schedulers are instances of ExecutorScheduler
and backed by a Java ExecutorService
.
You can find all these constructors in the SchedulerCompanionImpl
.
Bridge between Futures and Tasks
Tasks and Futures are quite close and it’s quite easy to convert from one to the other.
You can convert a Future to a Task by using some constructor methods:
val taskA = Task.fromFuture(myFuture) // if you don't want to execute the future right now val taskB = Task.deferFuture(Future { ... }) // which is that same as val taskC = Task.defer(Task.fromFuture(Future { ... }))
To convert a Task into a Future you just need to run that task by calling runAsync
and providing an implicit scheduler.
Interestingly it’s possible to use different Scheduler for different step of a computation. In such cases the Scheduler must be passed in when building the computation using the fork
method.
val task = Task( println(s"Running on thread: ${Thread.currentThread.getName}") ) val forked = Task.fork(task, Scheduler.io(name="my-io")) // runs on the global scheduler task.runAsync // forked is started on the global scheduler // but it runs `task` using a specific IO scheduler forked.runAsync
This is especially useful if you want to use a dedicated thread-pool for your database calls, …
Conclusion
At first sight a Task
might seem less powerful than a Future
as it cannot execute a computation directly like Futures do. On the other hand it gives more control to the user as this one is now able to choose precisely when to trigger the computation and what kind of execution context should be used.
This is called the principle of least power: “Always choose the least powerful feature” (that is able to do the job).
There is a great talk on this subject by Runar Bjarnason: Contraints liberate, liberties constrain.