Grpc-akkastream, the akka-stream implementation built on top of GRPC looks good on the surface but if you look under the hood there is one problem: it doesn’t provide any support for back-pressure.
Akka-streams (as any reactive-stream implementation) provides a way for back-pressure and GRPC (at least the Java version) also has an API that support back-pressure. Let’s see how we can wire everything together and provide a truly reactive akka-stream implementation for GRPC!
Before we start let’s briefly recap why we need back-pressure: Imagine you have one producer and one consumer. If the consumer consumes messages faster than the producer then everything’s fine, whatever the producer sends the consumers is able to cope with it. Now imagine that the consumer is slower than the producer. When a new message comes in the consumer might not be able to handle it straight away so it stores it in a buffer but as more and more messages arrive the buffer grows bigger and bigger until the consumer runs out of memory.
The only solution is for the consumer to tell the producer to slow down so that it can cope with the flow. That means there should be a communication channel going back from the consumer to the producer. This channel is aimed only at signalling to adjust the throughput of messages.
Akka-stream back pressure
All the building blocks of an Akka-stream: Source
, Flow
and Sink
are actually graph stages.
A graph stage is made of ports and some logic to route/create/consume messages.
There are 2 types of ports:
- Input port: a port that can receive messages
- Output port: a port that can emit messages
Using these 2 types of ports it’s possible to build any graph stage:
- A
Source
is a graph stage with only one output port (only emit messages) - A
Sink
is a graph stage with only one input port (only receives messages) - A
Flow
is a graph stage with one input port and one output port - More complex graph stage can have multiple ports and provide routing capabilities, …
Now to implement back pressure each port provide 2 operations: pull and pushed.
An output port can only push an element out if it has been pulled by downstream.
An input port has to pull the upstream to receive an element.
In order to know when a port has been pulled or pushed Akka-stream provide the associated handlers.
trait OutHandler { /** * Called when the output port is pulled */ def onPull(): Unit /** * Called when the output port will no longer accept any new elements */ def onDownstreamFinish(): Unit }
trait InHandler { /** * Called when the input port has a new element available */ def onPush(): Unit /** * Called when the input port is finished. */ def onUpstreamFinish(): Unit /** * Called when the input port has failed. */ def onUpstreamFailure(ex: Throwable): Unit }
The principle is simple: Every time an output port is pulled (from the onPull
callback) method it is allowed to push an element out with the push
method.
On the input port, every time it is ready to receive an element it calls the pull
method and it can then retrieve the element inside the onPushed
callback (using the grab
method).
There is a catch though. The push
method can’t be called outside of the onPulled
method because everything actually happens sequentially:
- The consumer calls the
pull
- This triggers the
onPulled
callback on the producer - The producer sends produces an element with the
push
method (thepush
method is called inside theonPulled
callback). - This triggers the
onPushed
callback on the consumer - The consumer retrieves the element by calling the
grap
method.
That’s quite a problem if you’re not in control of when the element are generated. Let’s imagine that you want to create a Source
of elements coming from HTTP requests, or elements published on a Kafka stream, … Obviously you can’t control when the elements are generated and you can’t guarantee that an element will be available when the downstream stages will trigger the onPulled
callback on your Source
.
Fortunately there is a solution to this problem: Async callbacks.
Whenever there is an element available you wrap the push
call into an async callback.
// whenever an element becomes available val callback = getAsyncCallback((value: T) => push(out, value)) callback.invoke(element)
Then when the downstream is going to pull the source for a new element the callback is going to kick in and make that element available to the downstream.
Now that we know how to implement back-pressure inside Akka-stream let’s see how it’s done in GRPC.
GRPC back-pressure
At first sight GRPC-java doesn’t seem to provide any back-pressure mechanism. The messages are passed using StreamObserver
s:
trait StreamObserver[T] { def onNext(element: T): Unit def onCompleted(): Unit def onError(t: Throwable): Unit }
Everything seems to go forward. Whenever an element is available the onNext
callback is invoked and there is no way for the consumer to slow down the producer.
If you scratch the surface a bit you’ll see that the StreamObserver
instances actually implements the CallStreamObserver
.
abstract class CallStreamObserver[T] extends StreamObserver[T] { def isReady(): Boolean def setOnReadyHandler(handler: Runnable): Unit def disableAutoInboundFlowControl(): Unit def request(count: Int): Unit def setMessageCompression(enable: Boolean): Unit }
There is a few interesting things to note here:
- The
request
method enables a signalling flow from the consumer to producer supporting back pressure. isReady
can be checked before callingonNext
in order to avoid flooding the consumer.- The
onReadyHandler
allows to register a callback that is invoked when the consumer becomes ready. - Finally we need to disable the automatic flow control in order to control the flow ourselves with the
request
method.
And that’s it this is all we need to implement a back-pressure in a GRPC service.
This is how it’s done on the client side:
And somewhat similar on the server side:
It works great for bidirectional and client streaming calls. It’s not completely the case for server calls. Why?
Because when you call a server streaming endpoint you only provide a StreamObserver
for the response and the GRPC framework doesn’t return anything back.
Therefore it’s not possible to disable the automatic flow control nor to request more elements. Depending on your implementation it might not be too much of a problem as the request
call is made after the onNext
callback has been invoked. As long as you process completely the message in the onNext
callback your service shouldn’t be overflowed. If not … well you have to dig deeper.
And if you look at the CallStreamObserver
implementations you’ll see that they actually use a ClientCall
or ServerCall
and both of them provide a request
method along with a Listener
interface.
abstract class ClientCall[I, O] { abstract class Listener[T] { def onHeaders(headers: Metadata): Unit def onMessage(message: T): Unit def onClose(status: Status, trailers: Metadata): Unit def onReady(): Unit } def start(listener: Listener[O], headers: Metadata): Unit def request(count: Int): Unit def cancel(message: String, cause: Throwable): Unit def halfClose(): Unit def sendMessage(message: I) def isReady(): Boolean def setMessageCompression(enable: Boolean): Unit }
It’s always possible to use these lower-levels implementations to provide back-pressure whatever the type of call you’re making – the drawback being having to re-implement the logic present in the CodeStreamObserver
s.
GRPC-Akkastream: Putting it altogether
Now that we’ve got a good idea on how back-pressure is implemented inside both GRPC-java and Akkastream, let’s see how we can put everything together and provide a fully reactive akkastream interface for GRPC.
We need 3 pieces:
- A reactive
Source
- A reactive
Flow
- A reactive
Sink
On the server side, the Source
is going to provide incoming messages to the service Flow
(implemented by the user). The output of the Flow
is connected into the Sink
that sends the responses to the client.
On the Source
stage, every time an element is pulled by the downstream we need to request one more element from the GRPC observer using the request
method.
class GrpcSourceStage[I, O](requestStream: CallStreamObserver[O]) extends GraphStageWithMaterializedValue[SourceShape[I], Future[StreamObserver[I]]] { val out = Outlet[I]("grpc.out") override val shape: SourceShape[I] = SourceShape.of(out) override def createLogicAndMaterializedValue( inheritedAttributes: Attributes ): (GraphStageLogic, Future[StreamObserver[I]]) = { val promise: Promise[StreamObserver[I]] = Promise() val logic = new GraphStageLogic(shape) with OutHandler { val inObs = new StreamObserver[I] { override def onError(t: Throwable) = getAsyncCallback((t: Throwable) => fail(out, t)).invoke(t) override def onCompleted() = getAsyncCallback((_: Unit) => complete(out)).invoke(()) override def onNext(value: I) = getAsyncCallback((value: I) => push(out, value)).invoke(value) } override def onPull(): Unit = requestStream.request(1) override def preStart(): Unit = { requestStream.disableAutoInboundFlowControl() promise.success(inObs) } setHandler(out, this) } (logic, promise.future) } }
On the Sink
stage, every time we receive an element from upstream we check if the GRPC observer is ready. If it is we call its onNext
method. If not we buffer the element and the onNext
method will be called by the onReadyHandler
when the observer becomes ready.
class GrpcSinkStage[I](observer: CallStreamObserver[I]) extends GraphStage[SinkShape[I]] { val in = Inlet[I]("grpc.in") override val shape: SinkShape[I] = SinkShape.of(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with Runnable { var element: Option[I] = None override def run(): Unit = element match { case Some(value) if observer.isReady => observer.onNext(value) tryPull(in) case _ => () } override def onPush(): Unit = { val value = grab(in) if (observer.isReady) { observer.onNext(value) tryPull(in) } else element = Some(value) } override def onUpstreamFinish(): Unit = observer.onCompleted() override def onUpstreamFailure(t: Throwable): Unit = observer.onError(t) override def preStart(): Unit = pull(in) observer.setOnReadyHandler(this) setHandler(in, this) } }
On the client side, the source of requests is provided by the user. We then need to connect it to a Flow
that sends requests to the server and emits server responses to a Sink
provided by the user.
The Flow
stage is simply a combination of the Source
and Sink
above.
Overall the most confusing part is that you need to call request
on the opposite observer. E.g calling request
on the CallStreamObserver[I]
actually asks GRPC to send more responses (of type O
) to the StreamObserver[O]
.
Conclusion
All the implementation details are available inside this Github pull request. There are of course some possible optimisations like buffering more elements and requesting batch of elements at once …
For those interested in a Java reactive stream implementation there is an implementation supporting back-pressure provided by Salesforce called reactive-grpc.