After an overview of cats-effect which deals with single effect it feels natural to have a look at fs2 and see how multiple effects can be combined into a stream.
As we’ve covered some common streaming patterns with Akka stream a while ago it’ll be interesting to see how they compare with each other.
Overview
Fist thing first let’s create a Stream using fs2. Unlike Akka Streams that always require an actor system and a materializer, an fs2 stream can be created straight away:
val stream = Stream(1, 2, 3, 4)
An interesting thing to note is the type of the stream:
fs2.Stream[fs2.Pure, Int]
So an fs2 stream has 2 type parameters. The second one is the one you expect as it represents the type of the elements of the stream – e.g. Int
.
The first type is a type constructor which corresponds to the effect type. HerePure
means that the stream doesn’t require any effect to be evaluated.
If you were to integrate with cats-effect (let’s say to use cats IO – but fs2 can use any effect) you would create a stream of type
fs2.Stream[IO, Int]
Of course you can no longer use fs2.Stream.apply
to build the stream as the IO effect needs to be evaluated to product an element in the stream. This is simply done with the eval
method
fs2.Stream.eval(IO(2))
Like cats-effect, creating a Stream doesn’t run the stream. To “run” the stream we need to “compile” it and then use one of the cats effect run method:
fs2.Stream.eval(IO(2)) .compile .toList .unsafeRunSync
Now that we know how to create a stream let’s jump straight into some usage patterns and see how it compares to Akka streams.
Patterns
Flattening a stream
Unlike Akka stream, f2 doesn’t provide a mapConcat
method but it’s still pretty easy thing to do. We use emits
to create a stream from a collection which we can then just flatMap
:
fs2.Stream.emits('A' to 'E') .map(letter => (1 to 3).map(index => s"$letter$index")) .flatMap(fs2.Stream.emits) // this flattens the stream .compile .toList // or fs2.Stream.emits('A' to 'E') .map(letter => fs2.Stream.emits(1 to 3).map(index => s"$letter$index")) .flatten .compile .toList
]
This generates the element in sequence and gives a List(A1, A2, A3, B1, B2, B3, C1, ...)
It processes all the elements of the first stream before moving to the second stream. Now let’s imagine that each stream is infinite (e.g. A1, A2, A3, A4, A5, ...
). In this case the B elements (B1, B2, B3, B4, ...
) are never reached.
If we want to process the streams in parallel we can use parJoin
fs2.Stream.emits[IO, Char]('A' to 'E') .map(letter => fs2.Stream.emits[IO, Int](1 to 3).map(index => s"$letter$index")) .parJoin(5)
Now the streams are processed in parallel in a non-deterministic way. One possible out-come is D1, D2, D3, A1, A2, ...
Note that we’re using IO
here because parJoin
requires a concurrent effect.
Alternatively if you want to consume the stream in a breadth-first like fashion you have to do a little more work yourself (I’m not aware of anything usable out-of-the-box)
def breadthFirst[F, E](streams: Stream[F, Stream[F, E]]): Stream[F, Stream[F, E]] = Stream.unfoldEval(streams) { streams => val values = streams.flatMap(_.head) // get the head of each stream val next = streams.map(_.tail) // continue with the tails values.compile.toList.map(_.headOption.map(_ => values -> next)) // stop when there's no more values }
Batching
As with Akka Streams batching is straight forward with baked in methods:
Stream.emits(1 to 100).chunkN(10).map(println).compile.drain
A Chunk
is a finite sequence of values that is used by fs streams internally:
val s = Stream(1, 2) ++ Stream(3) ++ Stream(4, 5, 6) val chunks = s.chunks.toList // List(Chunk(1, 2), Chunk(3), Chunk(4, 5, 6))
And if you want to batch within a specific time window groupedWithin
is what you need:
Stream.awakeEvery[IO](10.millis) .groupWithin(100, 100.millis) .evalTap(chunk => IO(println(s"Processing batch of ${chunk.size} elements"))) .compile .drain.unsafeRunTimed(1.second)
Asynchronous computation
Here fs2 has clearly the advantage as asynchronicity depends directly on the effect type F
which must have an Async[F]
in scope.
It offers all the eval
methods: evalMap
, evalTap
, evalScan
, evalMapAccumulate
.
If you want to run asynchronous effect in parallel the effect type must have an instance of Concurrent[F]
in scope. If it’s the case the parEval
methods are available: parEvalMap
and parEvalMapAccumulate
.
Let’s keep the same example where a program write asynchronously to a database with
def writeToDatabase[F[_]: Async](chunk: Chunk[Int]): F[Unit] = Async[F].async { callback => println(s"Writing batch of $chunk to database by ${Thread.currentThread().getName}") callback(Right(())) }
we can then write batches in parallel to the database with
fs2.Stream.emits(1 to 10000) .chunkN(10) .covary[IO] .parEvalMap(10)(writeToDatabase[IO]) .compile .drain .unsafeRunSync()
Note that parEvalMap
preserves the stream ordering. If this is not required there is a parEvalMapUnordered
method.
If you’d like some consistency with the Akka Streams API you’d be glad to know that there is a mapAsync
(and mapAsyncUnordered
) methods that are just aliases for parEvalMap
(and parEvalMapUnordered
respectively).
Concurrency
In fs2 the async boundaries can be controlled by directly by the effect computations. Let’s consider a similar example as with Akka Streams where a streams runs through a series of stages (or pipes)
def pipe[F[_] : Sync](name: String): Stream[F, Int] => Stream[F, Int] = _.evalTap { index => Sync[F].delay( println(s"Stage $name processing $index by ${Thread.currentThread().getName}") ) } Stream.emits(1 to 10000) .covary[IO] .through(pipe("A")) .through(pipe("B")) .through(pipe("C")) .compile .drain .unsafeRunSync()
As expected this program uses a single thread and each element is process in sequentially through the pipes.
Now if we change our pipe definition to
def pipe[F[_] : Sync: LiftIO](name: String): Stream[F, Int] => Stream[F, Int] = _.evalTap { index => (IO.shift *> IO(println(s"Stage $name processing $index by ${Thread.currentThread().getName}"))).runAsync(_ => IO.unit).to[F] ) }
IO.shift
places an async boundary giving a chance to use another thread for execution. The runAsync
method runs the computation without waiting for its result.
If applied to the same stream as before the elements are still processed sequentially (1
processing starts before 2
which starts before 3
….) and the stages A
, B
and C
are also started in order (A
starts, then B
, then C
).
However we no longer wait for each stage to finish and as we use different threads the execution becomes non-deterministic.
Throttling
Fs2 provides a mechanism to create a stream that emits an element on a fix interval. If zipped to another stream it limits the rate of the second stream:
Stream.awakeEvery[IO](1.second) zipRight Stream.emits(1 to 100)
As it’s a very common pattern fs2 provide us with a method metered
that do just that.
If instead of limiting the rate of the stream you prefer to discard some elements you can use debounce
val ints = Stream.constant[IO, Int](1).scan1(_ + _) // 1, 2, 3, ... ints.debounce(1.second)
This emits an element at a fixed rate discarding every element produced in between.
Alternatively if you want to accept the first X elements emitted during a time interval and discard any other element until the end of the interval this is a little more involved as there is nothing that comes out of the box.
E.g. if we want 100 elements per second, we want to keep the first 100 elements then discard any other elements until another second starts.
val ints = Stream.constant[IO, Int](1).scan1(_ + _) // 1, 2, 3, ... val ticks = fs2.Stream.every[IO](1.second) // emits true every second val rate = 100 // 100 elements per second val throttledInts = ints.zip(ticks).scan((0, rate + 1)) { case (_, (n, true)) => (n, 0) // new second start, emit element and reset counter case ((_, count), (n, _)) => (n, count+1) // emit elements and increment counter } .filter(_._2 < rate) // keep only the elements where counter is less than rate .map(_._1) // remove counter
Idle timeout
Akka stream has an idleTimeout
methods that fails a stream if no element are emitted within a given timeout.
Fs2 doesn’t provide something similar but this is trivial to implement
def idleTimeout[F[_], A]( s: fs2.Stream[F, A], timeout: FiniteDuration )( implicit F: Concurrent[F], timer: Timer[F] ): fs2.Stream[F, A] = s.groupWithin(1, timeout).evalMap( _.head.fold[F[A]](F.raiseError(new Exception("timeout")))(F.pure) )
The idea is to use groupWithin
then check the Chunk
. If it contains an element that’s good we emit the element otherwise the chunk is empty so we raise the error.
Error Handling
Any exception raised during the stream processing (or explicitly calling Stream.raiseError
) ends the stream in error. The error handling is similar to the cats way of doing this
Stream.raiseError(new Exception("Oops")) .handleErrorWith { error => Stream(error.getMessage) }
Fs2 provides a flexible way to deal with retries. Be it retrying a fixed number of times, with or without backoff, … You can retry a single effect (or a whole stream if it is compiled into a single effect)
val ints = ( Stream.emits(1 to 10) ++ Stream.raiseError(new Exception("the end")) ).covary[IO].evalTap(n => IO(println(n))).compile.drain Stream.retry( ints, delay = 1.second, // delay before first retry nextDelay = _ * 2, // doubles the delay for every retry maxAttempts = 5, _ => true // retry on any error ).compile.drain.unsafeRunSync()
Handling ressources
As clearly mentioned in the fs2 documentation error handling is not meant for freeing up resources. For that matter fs2 provides a much safer way of doing things via the concept of bracket
.
That probably sounds familiar as it’s the same concept as provided by cats-effect.
Using bracket
makes sure that the resources are always freed whatever the outcome of the computation:
val acquire = IO(println("Aquiring resource")) *> IO(new Random()) val release = (_: Random) => IO(println("Releasing resource")) Stream.bracket(acquire)(release).flatMap { rand => fs2.Stream.emits(1 to 10).map(_ => rand.nextInt()) }.evalTap(n => IO(println(n))).compile.drain.unsafeRunSync()
There are other bracket methods like resource
(which takes a resource directly) and bracketCase
(which let’s you know how the computation ended in the release phase)
Conclusion
I found Fs2 API both simple and complete enough to write powerful applications. It might not cover all cases but the building blocks are powerful enough to let you write code to suit your own needs.
Embedding the effect inside the stream is powerful concept as it allows to express all your computation as a stream (including the side-effect) and then just use compile.drain
to run it.
Fs2 has been a pleasure to work with so far and it has my preferences over Akka Streams (it doesn’t require an ActorSystem nor relies on Scala Futures). It is flexible enough to work with any effect and the building blocks are easy to combine. Plus a thorough documentation, what’s left to ask?