Querying Cassandra from Scala

Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Share on Reddit

When it comes to accessing Cassandra from Scala there are 2 possible approaches:

  • The official Java driver
  • A custom DSL like Quill or Phantom

Custom-DSL are nice as they provide all the type-safety you need against your data schema. However in this post I will focus only on the Java driver. Why? Because it’s both a simple and decent solution in my opinion.

The bad thing is that you lose any type-safety as all the queries are just plain strings. On the other hand you don’t have to learn a new DSL because your queries are just CQL. Add a thorough test coverage and you have a viable solution.

Moreover the Java driver provides an async API backed by Guava’s futures and it’s not that difficult to turn these futures into Scala futures – which makes a quite natural API in Scala.

There are still some shortcomings that you’d better be aware of when consuming a result set but overall I think that it’s still a simple solution that is worth considering.

Scala integration of the Cassandra Java driver

Writing CQL statements

Our goal here is to be able to write CQL statements like this

val query = cql"SELECT * FROM my_table WHERE my_key = ?"

For that we’ll define our own String interpolation. Looks scary? No worries it pretty easy to do in Scala:

import com.datastax.driver.core._
import com.google.common.util.concurrent.ListenableFuture

implicit class CqlStrings(val context: StringContext) extends AnyVal {
  def cql(args: Any*)(implicit session: Session): ListenableFuture[PreparedStatement] = {
    val statement = new SimpleStatement(context.raw(args: _*))
    session.prepareAsync(statement)
  }
}

And that’s it. Now let’s see how we can use it. First we need a Cassandra session in the implicit scope to be able to use our CQL strings.

implicit val session = new Cluster
    .Builder()
    .addContactPoints("localhost")
    .withPort(9142)
    .build()
    .connect()

And then we’re ready to go (provided there is a Cassandra instance running on localhost)

val statement = cql"SELECT * FROM my_keyspace.my_table WHERE my_key = ?"

Nice, exactly what we hoped for! But as a Scala developer you’d rather deal with Scala Futures than the Guava’s ListenableFuture.

Integration with Scala Future

We can convert a ListenableFuture into a Future by means of a Promise. The idea is to complete the promise from the callback of the ListenableFuture and return the Future of the Promise.

import com.google.common.util.concurrent.{ FutureCallback, Futures, ListenableFuture }

import scala.concurrent.{ Future, Promise }
import scala.language.implicitConversions

implicit def listenableFutureToFuture[T](
  listenableFuture: ListenableFuture[T]
): Future[T] = {
  val promise = Promise[T]()
  Futures.addCallback(listenableFuture, new FutureCallback[T] {
    def onFailure(error: Throwable): Unit = {
      promise.failure(error)
      ()
    }
    def onSuccess(result: T): Unit = {
      promise.success(result)
      ()
    }
  })
  promise.future
}

We declare the method implicit so that all the ListenableFuture are automatically converted into Scala Future without anything else to do for us.

Then we can change the signature of our cql string interpolation to return a Future[PreparedStatement]

implicit class CqlStrings(val context: StringContext) extends AnyVal {
  def cql(args: Any*)(implicit session: Session): Future[PreparedStatement] = {
    val statement = new SimpleStatement(context.raw(args: _*))
    session.prepareAsync(statement)
  }
}

Now that we have a PreparedStatement ready we need to execute it somehow. So let’s create a method that binds the PreparedStatement and execute it.

import scala.concurrent.{ ExecutionContext, Future, Promise }

def execute(statement: Future[PreparedStatement], params: Any*)(
  implicit executionContext: ExecutionContext, session: Session
): Future[ResultSet] = 
  statement
    .map(_.bind(params.map(_.asInstanceOf[Object])))
    .flatMap(session.executeAsync(_))

If we want to use it we can write something as simple as this (assuming everything is in scope)

val myKey = 3
val resultSet = execute(
   cql"SELECT * FROM my_keyspace.my_table WHERE my_key = ?",
   myKey
)

Pretty neat, isn’t it? Feels very scala-ish. It’s not bad given the small amount of code we just write to improve the java driver integration with Scala.

Of course the cql statements are just strings to there is no schema validation whatsoever at compile-time. It can always fail at runtime. That’s why you need a proper test coverage! (or use a third-party library which provides this kind of type-safety).

Consuming a Cassandra ResultSet

Now that we are up to a point where we can get a ResultSet from Cassandra, let’s see how to extract the results from it.

The naive way to extract the rows from the result set would be to do something like this

import scala.collection.JavaConverters._

val resultSet = execute(cql"SELECT * FROM my_keyspace.my_table")
val rows: Future[Iterable[Row]] = resultSet.map(_.asScala)

This code simply converts the result set into an Iterable[Row]. That’s perfectly fine as long as your result set returns only a few rows.

If the result sets contains thousands of rows you have to be careful when you consume this result set. For instance one common thing to do is to turn the Cassandra Row into a domain object.

val entities = 
  execute(cql"SELECT * FROM my_keyspace.my_table")
    .map(_.asScala.map(parseEntity))

Assuming parseEntity is a function Row => Entity. What is not obvious here is that the map operation that turns a Row into an Entity will actually consume the whole dataset. Yes, it will loads everything into memory. Why? Because Scala Iterable is strict. However there is an easy way to remedy this problem: Call the view method on this Iterable to make it non-strict.

val entities =
  execute(cql"SELECT * FROM my_keyspace.my_table")
    .map(_.asScala.view.map(parseEntity))

Alternatively you can turn it into a Stream to achieve the same results

val entities =
  execute(cql"SELECT * FROM my_keyspace.my_table")
    .map(_.asScala.toStream.map(parseEntity))

That’s much better but there is still something that I don’t quite like: paging. What do I mean? Well, when a result set contains many rows (typically more than 5000) the driver doesn’t fetch all of them at once. Instead it uses paging and returns only the first page of data (i.e. the first 5000 rows).

When you iterate over the rows that’s pretty fast as everything is available from memory … until you try to fetch the 5001st row. At this point the driver needs to fetch another page of data (i.e. the next 5000 rows) from the database and this time this is a blocking. (There is no way to get a Future while we are iterating over the rows).

In my application is takes about 100 ms to fetch an additional page of data but I’d rather not block my application threads to fetch the database results.

Note that the page size is configurable with Statement.setFetchSize. In our implementation that can fit into our execute statement:

def execute(statement: Future[PreparedStatement], pageSize: Int, params: Any*)(
  implicit executionContext: ExecutionContext, session: Session
): Future[ResultSet] = 
  for {
    ps <- statement
    bs =  ps.bind(params.map(_.asInstanceOf[Object]))
    rs <- session.executeAsync(bs.setFetchSize(pageSize))
  } yield rs

That gives us a little room to avoid paging but it’s not a proper solution. We want to use the cassandra session and not our application threads to fetch more data.

The ResultSet API has an async method to fetch more results, simply called fetchMoreResults and methods to check wether or not this result set is exhausted, is completely fetched and get the number of rows fetched.

ListenableFuture<ResultSet> fetchMoreResults();
boolean isExhausted();
boolean isFullyFetched();
int getAvailableWithoutFetching();

With this we are able to write a function that takes a ResultSet and return a ResultSet with more results.

def fetchMoreResults(resultSet: ResultSet)(
  implicit executionContext: ExecutionContext, session: Session
): Future[ResultSet] = 
  if (resultSet.isFullyFetched) {
    Future.failed(new NoSuchElementException("No more results to fetch"))
  } else {
    resultSet.fetchMoreResults()
  }

So now what? Can we get a get something like an Iterable[Future[ResultSet]]. Well, not quite!

In fact it is certainly possible to create such an Iterable but there is no way to end the iteration as we’ll have to wait for the future to complete to know if there is a next element. As the iterable doesn’t wait for the future to complete (because it hasn’t any knowledge of the type of its elements) it returns an “infinite” number of elements.
No quite what we want!

Single Multiple
Synchronous A Iterable[A]
Asynchronous Future[A] / Task[A] Observable[A]

The observable pattern is exactly what we are after.

The Monix library provides a pretty good observable implementation and is part of the typelevel project (of course there are other implementations like RxScala or even reactive streams, …).

Looking at the Observable API there is a function of particular interest in our case:

def fromAsyncStateAction[S, A](f: S => Task[(A, S)])(initialState: => S): Observable[A]

This function allows to generate the elements of an Observable. It takes a function that given a state S returns a Task containing a Pair of an element A and the next state S. We initiate the generation of elements by providing an initial state.

I haven’t said anything so far about what a Task is so far. You can think of it as a Future that doesn’t execute automatically when created but only when it is told to. Task is also part of the Monix library.

In our case our initialState will be the Future[ResultSet] returned by execute(). And we want to return an Observable[ResultSet]. That means S is Future[ResultSet] and A is simply ResultSet.

Everything seems to fit in place quite nicely so let’s write a query function that returns an Observable[ResultSet].

import monix.eval.Task
import monix.reactive.Observable

def query(cql: Future[PreparedStatement], parameters: Any*)(
  implicit executionContext: ExecutionContext, cassandraSession: Session
): Observable[ResultSet] =
  Observable.fromAsyncStateAction[Future[ResultSet], ResultSet] { nextResultSet =>
    Task.fromFuture(nextResultSet).flatMap { 
       case resultSet if resultSet.isExhausted => Task.never
       case resultSet => 
         // consume the fetched rows in order to trigger isExhausted
         (1 to resultSet.getAvailableWithoutFetching) foreach (_ => resultSet.one)
         Task((resultSet, resultSet.fetchMoreResults))
    }
  }(execute(cql, parameters: _*))

Not bad but we’re not really interested in the ResultSet itself but in the fetched rows. So let’s change our method to return an observable of Rows instead.

def query(cql: Future[PreparedStatement], parameters: Any*)(
  implicit executionContext: ExecutionContext, cassandraSession: Session
): Observable[Row] = {

  def fetchRows(nextResultSet: Future[ResultSet]): Task[(Iterable[Row], Future[ResultSet])] =
    Task.fromFuture(nextResultSet).flatMap {
      case resultSet if resultSet.isExhausted => Task.never
      case resultSet =>
         val rows = (1 to resultSet.getAvailableWithoutFetching) map (_ => resultSet.one)
         Task((rows, resultSet.fetchMoreResults))
    }

  val observable = Observable.fromAsyncStateAction[Future[ResultSet], Iterable[Row]](
    fetchRows
  )(execute(cql, parameters: _*))

  observable.flatMap(Observable.fromIterable)
}

Here we slightly changed our generation function to extract the fetched rows from the result set. That gives us an Observable[Iterable[Row]]. We then flatMap it to get an Observable[Row].

What does this get us? Let’s be honest, probably not much in terms of performance. It’s still going to need the same amount of time to fetch data from Cassandra. The main advantage now is that we’re no longer blocking our application threads to fetch the data.

From the client side it becomes quite easy to query a Cassandra table

import monix.execution.Ack
import monix.execution.Scheduler.Implicits.global

// creates an observable of row
val observable = query(cql"SELECT * FROM my_keyspace.my_table")

// nothing happens until we subscribe to this observable
observable.subscribe { row =>
  // do something useful with the row here
  println(s"Fetched row id=${row.getString("my_key")}")
  Ack.Continue
}

Parsing a row

Parsing a row is pretty straight-forward. However there is one pitfall that you need to be aware of.
That is handling null values. In scala we don’t like null. Instead we can use Option to indicate the absence of a value. A natural thing to do when parsing a row might be something like:

val maybeName = Option(resultSet.getString("name"))
val maybeAge = Option(resultSet.getInt("age"))

What you expect to get here if the name is not set is a None. Which is what happens – if no value is set in Cassandra the java driver returns a null which is turned into None by the Option’s apply method.

And we can expect the same thing to happen on the second line for the age. But no, if there is no value set for the age in Cassandra the driver doesn’t return null but 0. So in this case you get a Some(0). In fact you never get a None here. So the correct implementation is:

val maybeAge = 
  if (resultSet.isNull("age")) None
  else Some(resultSet.getInt("age"))

Testing

We are now approaching the end of this blog post so it’s a good time for a few words on testing. The good thing is that there is a an embedded version named cassandra-unit that you can use to run your tests.

It’s pretty easy to setup:

import java.net.InetAddress
import com.datastax.driver.core.Cluster
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import scala.concurrent.duration._

EmbeddedCassandraServerHelper.startEmbeddedCassandra(60.seconds.toMillis)
val cluster = new Cluster
  .Builder()
  .addContactPoints(InetAddress.getByName("127.0.0.1"))
  .withPort(9142)
  .build()
implicit val session = cluster.connect()

However is damn slow so be careful not to spin up a Cassandra instance for every single test. Instead you can share the same session among tests. This requires that you clean up the data after a test. Using “TRUNCATE table” seems to do a decent job.

You should limit your tests to the minimum using Cassandra. Only test your queries and mapping to/from domain objects. Perform an extensive testing of these functions as we don’t have any type-safety here (everything is stringly typed).

I think that’s all you need to test using Cassandra. You should be able to tests the remaining of your application without starting up a Cassandra instance.

  • Luis Dipotet

    Hi, I have done something that It isnt finished. I dont like your implementation of future from google callback the other things I think that are reasonably fine.

    https://github.com/ldipotetjob/scalacassandra.

    • If I am not mistaken your project builds a DML on top of Cassandra.
      Here the idea was not to build an extra layer but just to use the existing Java driver for Cassandra and give it a Scala look and feel.
      As the Java driver relies on google’s Futures I find it more convenient to turn them into Scala Futures when used in a Scala project.

  • Shailesh Tikhe

    Thank you for amazing blog.
    After subscribing on observable, how can we know if you have iterated over all the rows?

    • When all rows are consumed the observable completes, and in order to be notified of the completion you need an onComplete callback.

      You can pass a whole Subscriber (or Observer) to the subscribe method. Subscriber (and Observer) implement an onComplete method.

      It’s also possible to pass all the callback functions (onNext, onError and onComplete) directly into the subscribe call.