Querying DynamoDB from Scala

As we’ve seen in this comparison with Apache Cassandra, DynamoDB may be a valuable choice for storing data. It’s fully maintained by AWS you just have to configure your instance and you can start using it straight away.

Being one of the Amazon web services DynamoDB offers a JSON/HTTP interface and Amazon provides a set of client for a variety of language. Unfortunately Scala is not one of them but there is a Java client available. Sadly it suffers from big performance issues due to poor design choices.

So what are the alternatives? This is what we’re going to see in this post: How can we get the most out of the official driver and are there better alternatives both in terms of performance or integration with the Scala ecosystem.

The AWS Java client

As the time of writing of this post, the official AWS Java Client for DynamoDB is based on the Apache HTTP client and uses blocking calls. This is very unfortunate as making an HTTP call takes a very long time (with respect to CPU cycles). It might be useful for testing purposes but it’s definitely not something I’d like to use in production code.

There’re actually 2 clients:

  • AmazonDynamoDB is the blocking client I just mentioned
  • AmazonDynamoDBAsync is an “async” client that doesn’t block the calling thread

AmazonDynamoDBAsync is clearly a better option but it’s still not truly async. It still relies on Java Future (which blocks as soon as you try to access the value inside the future) and uses a thread pool (a FixedThreadPool to be exact) to perform the blocking call when sending the HTTP request. That’s clearly a limiting factor in a high-throughput application.

In fact it might happen that your application cannot reach your DynamoDB provisioned throughput just because there is no more threads available to perform more requests. So what can you do?

Of course, use bigger thread pools. This is easily configurable but you’re going to reach a point where there is too much contention on the thread pool’s job queue. You might then prefer to use 2 (or more) thread pools and dispatch requests to these thread pools. But clearly using 1 thread per request doesn’t scale well.

Amazon is very aware of this limitation and is currently working on a new version of it’s Java client. This new client is based on Netty – one of the fastest HTTP library on the JVM – and offers a truly async interface based on Java 8 CompletableFuture.

This new client is certainly an option worth considering. However at the time of writing of this post it is still under “developper preview” and might change according to the feedback from the community.

The API

The client is just a wrapper around HTTP calls. It is suppose to make the developer’s life easier but I cannot say that I particularly appreciate the Java interface. It relies heavily on the “builder” pattern in order to make a “fluent” interface.

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
import scala.collection.JavaConverters._

val client = AmazonDynamoDBClientBuilder.standard().build()
val key = Map(
  "Artist"    -> new AttributeValue().withS("No One You Know"),
  "SongTitle" -> new AttributeValue().withS("Call Me Today")
).asJava
val request = new GetItemRequest().withTableName("Music").withKey(key)
val response = client.getItem(request)

Basically the client uses a Map[String, AttributeValue] to represents the DynamoDB specific JSON syntax:

{
  "Artist": {"S": "No One You Know"},
  "SongTitle": {"S": "Call Me Today"}
}

Alternatively there is a higher-level API that uses POJO-annotations and a DynamoDBMapper to generate the JSON corresponding to your data model.

Given all the JSON libraries available in Scala there is certainly a better option where we would define our data model as an ADT (a set of sealed traits and case classes) and have a library handle the JSON transformation for us: This is Scanamo.

Scanamo

Scanamo is a Scala library for DynamoDB developed by the Guardian.

It provides 2 components:

  • DynamoFormat provides a mapping from your ADT case classes to the AWS AttributeValue
  • ScanamoOpsThe DynamoDB operations modelled as a Free monad

Of course it glues everything together in a simple syntax to query DynamoDB:

// The data model
case class Farm(animals: List[String])
case class Farmer(name: String, age: Long, farm: Farm)
// DynamoDB ops
val table = Table[Farmer]("farmer")
val ops = for {
  _ <- farmTable.putAll(Set(
    Farmer("Boggis", 43L, Farm(List("chicken"))),
    Farmer("Bunce", 52L, Farm(List("goose"))),
    Farmer("Bean", 55L, Farm(List("turkey")))
  ))
  bunce <- farmTable.get('name -> "Bunce")
} yield bunce
// Execute the ops
Await.result(ScanamoAsync.exec(client)(ops), 5.seconds)

DynamoFormat

DynamoFormat is pretty interesting in itself. DynamoFormat is actually a type-class with methods to transform to/from AttributeValue (which in turn is transformed into a JSON string to be sent over the wire).

@typeclass trait DynamoFormat[T] {
  def read(av: AttributeValue): Either[DynamoReadError, T]
  def write(t: T): AttributeValue
  def default: Option[T] = None
}

The interesting bit is that it relies on Shapeless to automatically derive type class instances for your ADT.

It’s also easy to define additional formats for specific classes (e.g. for java Instant)

import java.time.Instant

implicit val dateFormat: DynamoFormat[Instant] =
  coercedXmap[Instant, String, IllegalArgumentException](Instant.parse)(_.toString)

Like most of the Scala-JSON libraries it relies on implicit resolutions to find the right format.

This is a pretty nice library all by itself and I wish the Guardian had released it on its own (much like play-json is released outside of Play) but more on that later.

Scanamo Operations

ScanamoOpsA represents all the DynamoDB operations along with their return type.

sealed trait ScanamoOpsA[A] extends Product with Serializable
final case class Put(req: ScanamoPutRequest) extends ScanamoOpsA[PutItemResult]
final case class Get(req: GetItemRequest) extends ScanamoOpsA[GetItemResult]
final case class Delete(req: ScanamoDeleteRequest) extends ScanamoOpsA[DeleteItemResult]
final case class Update(req: ScanamoUpdateRequest) extends ScanamoOpsA[UpdateItemResult]
// ...

These operations are then lifted into a Free monad so that they can be easily combined.

object ScanamoOps {
  import cats.free.Free.liftF

  def put(req: ScanamoPutRequest): ScanamoOps[PutItemResult] = 
    liftF[ScanamoOpsA, PutItemResult](Put(req))

  def get(req: GetItemRequest): ScanamoOps[GetItemResult] = 
    liftF[ScanamoOpsA, GetItemResult](Get(req))
 
  // ...
}

where ScanamoOps is just a Free monad

type ScanamoOps[A] = Free[ScanamoOpsA, A]

Having a (Free) monad we can combine them in for-comprehensions. This is what was done in the example above. We performed a put followed by a get request:

val ops = for {
  _ <- farmTable.putAll(Set(
    Farmer("Boggis", 43L, Farm(List("chicken"))),
    Farmer("Bunce", 52L, Farm(List("goose"))),
    Farmer("Bean", 55L, Farm(List("turkey")))
  ))
  bunce <- farmTable.get('name -> "Bunce")
} yield bunce

Note that nothing is executed here. It’s just a description (or blueprint) of the operations to be executed on DynamoDB. The farmTable.get will return an Option[Either[DynamoReadError, Farmer]] but in order to get it we need an Interpreter that can turn a Free[ScanamoOpsA, Option[Either[DynamoReadError, Farmer]]] into a Future[Option[Either[DynamoReadError, Farmer]]] (or directly into an Option[Either[DynamoReadError, Farmer]] if we’re using the blocking sync client).

That’s a bit of a convoluted return type but every type is there for a reason:

  • The Either[DynamoReadError, Farmer] is here in case the returned JSON can’t be deserialised into a Farmer
  • The surrounding Option indicates the possibility that there is no matching record found in DynamoDB.

Scanamo provides 2 interpreters (both of them rely on the AWS Java client):

  • ScanamoInterpreters.id that relies on the AWS sync client
  • ScanamoInterpreters.future that relies on the AWS async client

To execute the above operations we need to foldMap over them using one of these 2 interpreters.

val interpreter = ScanamoInterpreters.future(asyncClient)
ops.foldMap(interpreter)

Alternatively Scanamo provides some nicer syntax to do just that:

val res = Scanamo.exec(client)(ops)
// or
val futureRes = ScanamoAsync.exec(asyncClient)(ops)

Note that DynamoDB imposes some limitations in the size of batch requests but Scanamo provides a nice way to get around them with:

def putAll[T](tableName: String)(items: Set[T])(implicit f: DynamoFormat[T]): ScanamoOps[List[BatchWriteItemResult]] =
    items.grouped(batchSize).toList.traverseU(batch =>
      ScanamoOps.batchWrite(
        new BatchWriteItemRequest().withRequestItems(Map(tableName -> batch.toList.map(i =>
          new WriteRequest().withPutRequest(new PutRequest().withItem(f.write(i).getM))
        ).asJava).asJava)
      )
    )
// deleteAll and getAll work similarly

Here it creates as many BatchWriteItemRequest as needed to handle all the items passed in.

To conclude Scanamo is a very handy library to work with DynamoDB as it provides tools that integrates nicely with Scala. The use of the Free monad allows to compose multiple requests together but it’s well hidden behind simple APIs so that it’s possible to use it without being aware of such implementation details.

The interpreters provided by Scanamo are based on the AWS Java client and as such suffers the same performance issues.

The good news is that providing a new interpreter based on a different library is easy to do and above all runs all of your operation without any code changes. So the question here: Is there any other DynamoDB client on the JVM?

Alpakka

It turns out there is another DynamoDB client on the JVM in the form of an Akka-stream connector part of the Alpakka project.

Alpakka aims at providing a all sorts of connectors for Akka-streams (Kafka, DynamoDB, SQS, Kinesis, Cassandra, AMQP, JMS, elasticsearch, …).

If you look at the connector for DynamoDB you’ll find a rather simple implementation and at its heart is this method:

override protected val connection: AwsConnect =
  if (settings.port == 443)
    Http().cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host)(materializer)
  else
    Http().cachedHostConnectionPool[AwsRequestMetadata](settings.host, settings.port)(materializer)

What it means is that it doesn’t use the AWS Java client to send request but Akka-Http (a truly non-blocking HTTP client based on Akka-stream!).

If we look closer to the Alpakka connector we see that it interfaces with the request/response entities provided by the AWS Java client and transform them into Akka HTTP requests/responses.

It makes the client not very pleasant to use (Java builders, no-automatic conversion from the data model ADT, …) but having a non-blocking client is a huge win!

The better of both worlds

Knowing this, you’ve probably guessed where this is going by now: Can we use the Alpakka client in Scanamo?

Well, it turns out that yes, we can. If you think about it, the Scanamo interpreters are using the AWS Java client so they must turn the Scanamo operations into AmazonWebServiceRequests at some point.

And there is an object that do just that (although it is package private):

private[ops] object JavaRequests {
  def scan(req: ScanamoScanRequest): ScanRequest = ???
  def query(req: ScanamoQueryRequest): QueryRequest = ???
  def put(req: ScanamoPutRequest): PutItemRequest = ???
  def delete(req: ScanamoDeleteRequest): DeleteItemRequest = ???
  def update(req: ScanamoUpdateRequest): UpdateItemRequest = ???
}

Then the job of the interpreter is simple: turn the Scanamo opereration into an AWS request (using JavaRequests) and use the AWS client to execute this request.

def id(client: AmazonDynamoDB) = new (ScanamoOpsA ~> Id) {
  def apply[A](op: ScanamoOpsA[A]): Id[A] = op match {
    case Put(req) =>
      client.putItem(JavaRequests.put(req))
    case Get(req) =>
      client.getItem(req)
    case Delete(req) =>
       client.deleteItem(JavaRequests.delete(req))
    // ...
  }
}

The idea to implement a new interpreter is to replace the client with an Alpakka connector:

def future(client: DynamoClient)(implicit executor: ExecutionContext): ScanamoOpsA ~> Future =
  new (ScanamoOpsA ~> Future) {
      import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._
      override def apply[A](ops: ScanamoOpsA[A]): Future[A] =
        ops match {
          case Put(req) => client.single(JavaRequests.put(req))
          case Get(req) => client.single(req)
          case Delete(req) => client.single(JavaRequests.delete(req))
          // ...
        }
  }

And there you have it! A truly non-blocking interpreter for Scanamo. The best of both world: an easy and nice syntax with a performant client!

Conclusion

When it comes to accessing DynamoDB there is not a lot of option on the JVM (to my knowledge). However it’s still a web service so whatever client can do. The second aspect is how easy it ease to build a request. Scanamo really has a point to ease building the requests.

It would have make sense to split Scanamo into 2 artifacts:

  • One library to build AWS requests including DynamoFormats and a way to build the requests outside of the Free monad
  • One library based on the other one that uses the Free monad to compose Dynamo operations and execute them

As always you can find (and try) these ideas in this Github repo.