Building anti-corruption layers with Akka

Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Share on Reddit

Akka actors fits nicely with DDD (Domain Driven Design) to design an application. E.g. It’s quite natural to model entities as individual actors who can be persisted, …

One of the key aspect in DDD is the notion of bounded context. A bounded context is simply a “self-content” component. It can interact with other components but it is coherent on its own. Each bounded context has its own domain model which belongs only to itself and should not leaked or be influenced by other bounded context.

Anti-corruption layers (aka translation layers or adapter layers) are used to enforce this principle. Basically their role is to translate the core domain objects into/from another domain that is used for communication or persistence.

In this blog post we’re going to try to follow the DDD principles to build a small (contrived) application using Akka and try to figure out the best way to build efficient anticorruption layers.

The context

We’re going to build a tiny application with 2 services.

  • Auth: The authentication service in charge of creating sessions and verifying tokens
  • Blog: The blogging service which stores a list of posts

Then we have the App which simulates a scenario involving these 2 services.

Each of these services is persistent in order to recover from failure and doesn’t leak its domain model into the persistence layer or with the outside world (e.g. the App client).

Implementation

Both services follow the same implementation model and relies on 3 different domains:

  • Model.scala defines the core domain. This is the model used by the entities to implement their business logic.
  • Persistence.scala defines the model used to persist the events representing the actor state.
  • Protocol.scala defines the model (or the protocol) used by the other components to communicate with the service

The domain model

Given that this is just a proof of concept focusing on the anti-corruption layers, there is little to say about the core services.

They are implemented as Akka actors and are the only place where the core model is used to implement the business logic.

What they do is not really important but it’s worth noting that they call persist with domain objects. They also receive domain objects directly inside the receiveCommand method. Finally sending messages to the outside world requires a call to translate to convert domain objects into protocol objects.

The persistence layer

The persistence layer is implemented in 2 steps:

  1. An Akka EventAdapter ensures the translation between the domain model and the persistence model
  2. An Akka Serializer ensures the serialisation from the persistence model into the protobuf binary format

Fluent

The translation between the domain model and the persistence model is done using fluent. It makes the translation super-simple as we just have to specify which class gets transformed into which one.

import akka.persistence.journal.{EventAdapter, EventSeq}
import cats.instances.option._
import fluent._

class AuthEventAdapter extends EventAdapter {
  override def manifest(event: Any): String = event.getClass.getName
  override def fromJournal(event: Any, manifest: String): EventSeq = event match {
    case e: Persistence.Authenticated => EventSeq(e.transformTo[Model.Authenticated])
    case e: Persistence.Terminated    => EventSeq(e.transformTo[Model.Invalidated])
  }

  override def toJournal(event: Any): Any = event match {
    case e: Model.Authenticated => e.transformTo[Persistence.Authenticated]
    case e: Model.Invalidated   => e.transformTo[Persistence.Terminated]
  }
}

PBDirect

The serialisation into protobuf is assured by pbdirect. It allows to transform the persistent model case classes directly into protobuf without having to declare the same data structure into a .proto file.

import antikkor.example.auth.Persistence.{Authenticated, Terminated}
import antikkor.serialization.PBAkkaSerializer
import cats.instances.option._
import pbdirect._

class PBAuthSerializer extends PBAkkaSerializer {

  override def identifier: Int = 1340982252

  override def serialize: PartialFunction[AnyRef, Array[Byte]] = {
    case event: Authenticated => event.toPB
    case event: Terminated    => event.toPB
  }

  override def unserializeTo: PartialFunction[(Class[_], Array[Byte]), AnyRef] = {
    case (claSS, bytes) if claSS == classOf[Authenticated] => bytes.pbTo[Authenticated]
    case (claSS, bytes) if claSS == classOf[Terminated]    => bytes.pbTo[Terminated]
  }
}

As you can see PBAuthSerializer doesn’t extend the Akka Serializer directly. Instead it inherits from PBAkkaSerializer which checks if the implementation has a serialisation/deserialization defined for a given class.

import akka.serialization.Serializer

trait PBAkkaSerializer extends Serializer {
  def serialize: PartialFunction[AnyRef, Array[Byte]]
  def unserializeTo: PartialFunction[(Class[_], Array[Byte]), AnyRef]

  override def toBinary(o: AnyRef): Array[Byte] =
    if (serialize.isDefinedAt(o)) serialize.apply(o)
    else throw new IllegalArgumentException(s"Can't serialize ${o.getClass.getName}")

  override def includeManifest: Boolean = true

  override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
    manifest match {
      case Some(claSS) if (unserializeTo.isDefinedAt((claSS, bytes))) => unserializeTo.apply(claSS, bytes)
      case Some(claSS) => throw new IllegalArgumentException(s"Don't know how to deserialize to ${claSS.getName} in ${this.getClass.getName}")
      case None        => throw new IllegalArgumentException("Need a protobuf serializable class")
    }
}

Note that if you use several serializers (typically one per service) you need to make sure their ids are unique otherwise you might be asked to deserialise unexpected messages.

The translation layer

Similarly the translation layer is in charge of the translation between the domain model and the protocol model used to communicate with the outside world.

The implementation relies on fluent as well and look similar to the persistent’s layer EventAdapter.

import akka.AdapterActor
import cats.instances.option._
import fluent._

trait AuthAdapterActor extends AdapterActor {
  override protected def translate: PartialFunction[Any, Any] = {
    case message: Protocol.StartSession => message.transformTo[Model.Authenticate]
    case message: Protocol.Verify       => message.transformTo[Model.Verify]
    case message: Protocol.EndSession   => message.transformTo[Model.Invalidate]

    case message: Model.Authenticated   => message.transformTo[Protocol.SessionStarted]
    case message: Model.Verified        => message.transformTo[Protocol.SessionVerified]
    case message: Model.Invalidated     => message.transformTo[Protocol.SessionEnded]
    case message: Model.InvalidUser     => message.transformTo[Protocol.InvalidUser]
    case message: Model.InvalidToken    => message.transformTo[Protocol.InvalidSession]
  }
}

However this translation doesn’t fit into any of the Akka components.

One possible solution might be to implement a proxy actor in front of the business domain actor. The proxy actor would be in charge of translating messages into the appropriate model and forward it to the intended actor.

Here I tried another approach: define a wrapper using the aroundReceive method to intercept the message and translate them before they are delivered to the actor’s receive method.

This is exactly what the AdapterActor does.

trait AdapterActor extends Actor {
  protected def translate: PartialFunction[Any, Any]

  override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = {
    val adaptedReceive =
      if (translate.isDefinedAt(msg)) translate andThen receive
      else receive
    super.aroundReceive(adaptedReceive, msg)
  }
}

It works just fine for the incoming messages. On the other hand sending messages is performed directly by the ActorRef representing the destination actor. As it’s not possible to override the ! (tell) method on the ActorRef, there is no easy way to automate the translation for outgoing messages.

It’s possible to define another operator for “translate and tell”, but I found that actorRef ! translate(message) is clear and explicit enough (compared to another “obscure” operator).

Conclusion

The bright side

The core business model is contained inside the core actor and doesn’t leak into the persistence layer or to the outside world.

The translation mechanic is hidden from the core and lies in specific classes (or trait) which makes the code well organised and easy to navigate. PBDirect and fluent libraries makes it ease to weave things together.

The pitfalls

Implicit resolution occurs at compile time which is good because it minimises the runtime penalty but it doesn’t play well with Akka where the configuration is resolved at compile time (by reading the akka.conf file). Moreover the Akka API are defined as Any => Any or Any => Unit methods which doesn’t play well with implicit resolution which relies on type parameters.

I might be worth to have a look at Akka-typed to see how the “translation” behaviour can be implemented “around” another behaviour.

As usual you can find the code on github and let me know what you think or how do you implement your anti-corruption layers in Akka using the comments below.