Akka stream interface for gRPC

Back from holidays let’s continue with some of my favourite topics: AkkaStreams and gRPC.

We’ve already seen how can take advantage of the ScalaPB code generation tool to generate new interfaces (GRPCMonix) on top of the grpc-java implementation or to create new tools to integrate gRPC with other services (GRPCGateway).

Similarly to GRPCMonix which provides a Monix interface – Task, Observable – on top of gRPC, it’s possible to develop an AkkaStream interface on top of gRPC.

Under the hood it’s still the grpc-java implementation that is running (using Netty). The gRPC AkkaStream library is mainly a bridge between gRPC StreamObserver and AkkaStreams Flow.

The interface

gRPC defines 4 different use cases:

  • Unary calls: a classic request/response interaction
  • Server streaming calls: a single client request triggers multiple response from the server
  • Client streaming calls: a client sends multiple requests to the server which replies with a single response
  • Bidirectional streaming calls: a client sends multiple requests to the server with replies with multiple responses. Client requests and server responses can be intertwined.

The good thing with AkkaStreams is that we can model all these use cases using Flow. All the different call types can be modelled as a Flow[Request, Response, NotUsed] – a flow of client requests to server responses.

Installation

gRPC Akka-streams is composed of 2 parts:

  • a generator that generates client stub and the server trait to be implemented
  • a runtime library (used by the generated code) to convert between akka-streams Flow and gRPC StreamObservers

In order to use the generator you need to add the scalaPB plugin and a dependency to the generator into your build. Add a proton.sbt file in your project folder

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.9")

resolvers += Resolver.bintrayRepo("beyondthelines", "maven")

libraryDependencies ++= Seq(
  "com.trueaccord.scalapb" %% "compilerplugin"          % "0.6.0-pre5",
  "beyondthelines"         %% "grpcakkastreamgenerator" % "0.0.1"
)

Then in your build.sbt add the following

resolvers += Resolver.bintrayRepo("beyondthelines", "maven")

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value,
  grpc.akkastreams.generators.GrpcAkkaStreamGenerator -> (sourceManaged in Compile).value
)

libraryDependencies ++= Seq(
  "com.trueaccord.scalapb" %% "scalapb-runtime"       % com.trueaccord.scalapb.compiler.Version.scalapbVersion % "protobuf",
  // for gRPC
  "io.grpc"                %  "grpc-netty"            % "1.4.0",
  "com.trueaccord.scalapb" %% "scalapb-runtime-grpc"  % com.trueaccord.scalapb.compiler.Version.scalapbVersion,
  // for GRPC Akkastream
  "beyondthelines"         %% "grpcakkastreamruntime" % "0.0.1"
)

From now on, you’re ready to go just place a .proto file with some gRPC service definition in src/main/protobuf and it should automatically generates the corresponding akka-streams stub and traits.

Usage

On the server-side

On the server-side it means we just have to provide a Flow implementation that turns client requests into subscriber responses. It can be something as simple as:

// Unary case
Flow[Request].map(computeResponse)

// Server streaming
Flow[Request].flatMapConcat(computeResponses)

// Client streaming
Flow[Request].fold(defaultResponse)(computeResponse)

// Bidirectional streaming
Flow[Request].flatMapConcat(computeResponses)

but of course you can use any of the Akkastreams operation available. We can easily add throttling by adding a throttling operation:

Flow[Request]
  .throttle(1, 10.millis, 1, ThrottleMode.Shaping)
  .map(computeResponse)

On the client side

On the client side we just have to use the Flow provided by the stub.

Source
  .single(request)
  .via(stub.doSomething)
  .runForeach(println)

You can find more details with a complete example on github (This example is based on the route guide example from grpc-java).

That’s basically all you need to know to start enjoying working with akka-streams and gRPC.

Implementation

Now a few words on the implementation details. You don’t need to know anything about it to use it but I’ll think it might be useful information to share as it uses some more advanced akka-streams concepts (e.g. customs graph stage).

The gRPC Akka stream library provides a bridge between Akka-streams Flow and the corresponding StreamObserver used by the underlying implementation of grpc-java.

The client side

On the client side the user provides the input (source) and output (sink) and we need to implement a flow connecting the input to the output by invoking the gRPC service.

Unary Calls

Let’s start with the easiest part on the client side, the simple unary call: the classic request/response pattern.
This call is performed by

public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
      ClientCall<ReqT, RespT> call,
      ReqT param
)

This method calls sends the request ReqT to the server and returns a ListenableFuture with the response. Fortunately scalaPB provides us with a method that converts a ListenableFuture into a Scala Future.
Once we have a Scala Future we can easily turn it into a Source and flatMapConcat the initial flow with it to produce the response.

Flow[ReqT].flatMapConcat(request =>
  Source.fromFuture(
    Grpc.guavaFuture2ScalaFuture(
       ClientCalls.futureUnaryCall(channel.newCall(METHOD_NAME, options), request)
    )
  )
)
Server streaming

The server streaming call is slightly more complicated as we have to deal with multiple response from the server. grpc-java provides a StreamObserver to deal with it. We can take advantage of the akka-streams integration with the reactive-streams API to solve this problem. (A reactive Subscriber can easily be turned into a StreamObserver ).

Flow[ReqT].flatMapConcat(request =>
  Source.fromPublisher(
    new Publisher[RespT] {
      override def subscribe(subscriber: Subscriber[_ >: RespT]): Unit =
        ClientCalls.asyncServerStreamingCall(
          channel.newCall(METHOD_NAME, options),
          request,
          reactiveSubscriberToGrpcObserver[RespT](subscriber)
        )
    }
  )
)

We create a Source from a reactive Publisher which when subscribed triggers the gRPC call by transforming the subscriber into a StreamObserver.

Bidi and client streaming

These 2 cases can be solve the same way (they’re both implemented using StreamObserver to provide requests and consume responses) – the only difference is that the client streaming calls produce only one response from the server.

This use case is solved with a custom GraphStage.

class GrpcGraphStage[I, O](operator: GrpcOperator[I, O]) extends GraphStage[FlowShape[I, O]] {
  val in = Inlet[I]("grpc.in")
  val out = Outlet[O]("grpc.out")
  override val shape: FlowShape[I, O] = FlowShape.of(in, out)
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      val outObs = new StreamObserver[O] {
        override def onError(t: Throwable) = fail(out, t)
        override def onCompleted() =
          getAsyncCallback((_: Unit) => complete(out)).invoke(())
        override def onNext(value: O) =
          getAsyncCallback((value: O) => emit(out, value)).invoke(value)
      }
      val inObs = operator(outObs)
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val input = grab(in)
          inObs.onNext(input)
          pull(in)
        }
        override def onUpstreamFinish(): Unit = inObs.onCompleted()
        override def onUpstreamFailure(t: Throwable): Unit = inObs.onError(t)
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = ()
      })
      override def preStart(): Unit = pull(in)
    }
}

Here we define a GraphStage with a classic FlowShape: one input and one output. We then need to consume events from the input port and feed them into the input StreamObserver so that they are sent to the server.
The second part is to consume events from the output StreamObserver and feed them to the GraphStage output port.
The problem is that it’s fully asynchronous and we have no control of when the server will provide a response. That’s why we need to use getAsyncCallback to handle the responses.

Moreover a Flow should provide a back-pressure mechanism: produce its output only when asked by the downstream component. As we have no control of when the server produces a response, the only way to deal with back-pressure is to buffer the responses so that they are consumed downstream only when needed.

As you’ve probably noticed there is no buffer in this implementation. The buffering is in fact hidden by the call to emit. emit stacks new output handlers on top of each other and removed them once their elements are consumed. This is how the buffering is performed here.

Using this custom GraphStage the implementation is quite strait-forward

Flow.fromGraph(new GrpcGraphStage[ReqT, RespT](outputObserver =>
  ClientCalls.asyncClientStreamingCall(
    channel.newCall(METHOD_NAME, options),
    outputObserver
  )
))

Given an output StreamObserver we create an input StreamObserver by performing a server call. (both asyncClientStreamingCall and asyncBidiStreamingCall returns an StreamObserver for the input). The output StreamObserver is created in out custom GraphStage.

The server side

The server side wiring is the opposite of the client side. Here the user provides the flow (the service implementation) and we need to wire the input observer to the input port of the flow and the output port of the flow to the output observer.

Unary and server streaming calls

This is the easy part. We feed a Source of a single request into the server Flow and connect it to a Sink which feed every output event to a StreamObserver

Source
  .single(request)
  .via(serviceImpl.grpcMethod)
  .runForeach(responseObserver.onNext)
  .onComplete {
    case Success(_) => responseObserver.onCompleted()
    case Failure(t) => responseObserver.onError(t)
  }(mat.executionContext)
Bidi and client streaming calls

This is definitely the tricky part: we need to turn a StreamObserver into a Source.
I initially thought of leveraging the reactive streams API to do it:

reactiveSubscriberToGrpcObserver(
  serviceImpl
    .grpcMethod
    .to(Sink.fromSubscriber(grpcObserverToReactiveSubscriber(responseObserver)))
    .runWith(Source.asSubscriber[ReqT])
)

It looks simple and compiles like a charm. However it fails at runtime

java.lang.IllegalStateException: spec violation: onNext was signaled from upstream without demand

and again it’s because we have no control of when the events occur which is at odds with the back-pressure provided by akka-streams.

To solve this issue we used the same GraphStage strategy that we use on the client side but this time with a source shape (only one output port).

class GrpcSourceStage[O] extends GraphStageWithMaterializedValue[SourceShape[O], Future[StreamObserver[O]]] {
  val out = Outlet[O]("grpc.out")
  override val shape: SourceShape[O] = SourceShape.of(out)
  override def createLogicAndMaterializedValue(
    inheritedAttributes: Attributes
  ): (GraphStageLogic, Future[StreamObserver[O]]) = {
    val promise: Promise[StreamObserver[O]] = Promise()
    val logic = new GraphStageLogic(shape) {
      val observer = new StreamObserver[O] {
        override def onError(t: Throwable) = fail(out, t)
        override def onCompleted() = getAsyncCallback((_: Unit) => complete(out)).invoke(())
        override def onNext(value: O) = getAsyncCallback((value: O) => emit(out, value)).invoke(value)
      }
      setHandler(out, new OutHandler {
        override def onPull(): Unit = ()
      })
      override def preStart(): Unit = promise.success(observer)
    }
    (logic, promise.future)
  }
}

It’s very similar to the GraphStage we used on the client side (emit and async callbacks).
That’s great but we need to return a StreamObserver linked to this source so that the gRPC framework can provide us with the input events. Well, the only way to do it is to use the materialised value of the source to return the StreamObserver (that’s why it extends GraphStageWithMaterializedValue).

Notice that the materialised value is a Future[StreamObserver[In]]. We use a Future here because we don’t want incoming events to flow before the Source is fully initialised. As a consequence we have a blocking call to wait for the source initialisation in the generated code:

Await.result(
  Source
    .fromGraph(new GrpcSourceStage[io.grpc.routeguide.RouteNote])
    .via(serviceImpl.grpcMethod)
    .to(Sink.fromSubscriber(grpcObserverToReactiveSubscriber(responseObserver)))
    .run(),
  5.seconds
)

Conclusion

Using the reactive streams to make gRPC calls feels really good. However gRPC StreamObserver doesn’t play nice with reactive streams because of the lack of back-pressure support.
It’s probably worth investigating how to replace the Netty implementation with Akka-HTTP and look for a nicer integration.
As always you can find the complete source code on github, so don’t hesitate to have a look and contribute!