Asynchronous programming with Play framework

Background
Some  services provided by VigLink are handling 7K requests per second. Our current architecture is mostly running on a Tomcat 7/Spring 4 sync approach. Our services are mostly powered by MySQL, Elasticsearch, and Cassandra. Since traffic has grown over the past few years, we’ve increased the number of machines to keep up with throughput and response time.

Goal
In anticipation of growing traffic, we want to experiment with different frameworks and approaches to make our service more scalable and efficient.

  1. Increase throughput
    We aim to handle more traffic with the same hardware. This can reduce our engineering costs and ease up the maintenance of having too many servers. The CPU cycle being wasted on thread churn is one key thing we want to address in this experiment.
  2. Improve response time
    The vision of VigLink is to make the web better, therefore we strive to maintain and improve our request latency. We want to provide low, and most importantly a consistent latency because we’ve observed that spikes of latency happen once in a while on the current application.
  3. Java codebase compatibility
    Since our code base is mostly Java based, we hope to reuse our existing code base while enjoying the benefits of new framework.

Decision of Play framework
We started to learn and code with Play framework on our company’s internal hackathon day. We found it extremely easy to setup and run. Additionally, its async controller is well defined. As a result, Play framework was chosen to experiment with this time.

Code change
Future interface
To make our Java code’s computation and DAO asynchronous, we need a more advanced future interface to handle in-progress computation so we can code from end to end in async manner. Some options include Java 8 CompletableFuture, Akka, Guava ListenableFuture. We decided to use Guava ListenableFuture because we found it easy and powerful for transformation and chaining. Also, the code change for our existing code was minimal with ListenableFuture.

Migrating existing Java code to asynchronous computation
Cassandra client library
We are using Hector as our Java client library for Cassandra, but this client library does not provide async interface. As a result, we decided to try Astyanax for its powerful async interface. The code change was straightforward and easy since their interfaces are similar.

 

Hector – Sync

public UrlValidity getUrlValidity(String url) {
   SliceQuery<String, String, ByteBuffer> query = HFactory.createSliceQuery(linkSwapKeyspace, StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
 
   ColumnSlice<String, ByteBuffer> queryResult = query.setKey(getKey(url))
           .setRange(null, null, false, 100)
           .setColumnFamily(CassandraColumnFamily.URL_VALIDITY)
           .execute()
           .get();
 
   return extractValidity(queryResult.getColumns());
}

 

Elasticsearch
We are using Elasticsearch 1.2 standard Java client library. It provides async execution out of the box. As the return value of its async execute() method is ListenableActionFuture, we transformed this into ListenableFuture by using SettableFuture.

public ListenableFuture asyncSearch(final Class clazz, SearchQuery query, String... facetNames) {
SearchRequestBuilder searchRequestBuilder = createSearchContent(query, facetNames);

ListenableActionFuture searchResponse = searchRequestBuilder.execute();

final SettableFuture searchResult = SettableFuture.create();

searchResponse.addListener(new ActionListener() {
@Override
public void onResponse(SearchResponse searchResponse) {
String responseBody = toJson(searchResponse, true, false);
T result = unmarshalResponse(responseBody, clazz);
searchResult.set(result);
}

@Override
public void onFailure(Throwable throwable) {
searchResult.setException(throwable);
}
});

return searchResult;
}

 

Play Framework Async Controller
We are using Play 2.2.x for this experiment. Async controller can be easily implemented with Action.async {}. We have implemented AsyncUtil.asScalaFuture to transform Guava ListenableFuture to Scala Future as expected by Action.async {}. The tility method is inspired by sphere.util.Async.

@Component
class LinkSwapController extends Controller {

@Resource var offerSearchService: OfferSearchService = _

def swap(url: Option[String], keyword: Option[String]) = Action.async { request =>
AsyncUtil.asScalaFuture(offerSearchService.search(url.getOrElse(null), keyword.getOrElse(null))).recover {
case t =>
Logger.error("Exception caught", t)
BadRequest(...)
} map {
case result: OfferSearchResult => Ok(...)
}
}
}
public static Future asScalaFuture(ListenableFuture future) {
final Promise promise = new Promise<>();
Futures.addCallback(future, new FutureCallback() {
@Override
public void onSuccess(T result) {
promise.success(result);
}

@Override
public void onFailure(Throwable t) {
promise.failure(t);
}
});
return promise.future();
}

 

Metrics
Graphite is used to measure and monitor performance. Yammer metrics library is used to populate metrics to our Graphite server. To measure the response time of requests, we have implemented a metrics filter.

class MetricsFilter extends EssentialFilter {

def namePrefix = "responseCodes."

def statusCodes: Map[Int, Meter] = Map(
Status.OK -> Metrics.newMeter(classOf[MetricsFilter], namePrefix + "ok", "responses", TimeUnit.SECONDS),
Status.CREATED -> Metrics.newMeter(classOf[MetricsFilter], namePrefix + "created", "responses", TimeUnit.SECONDS),
Status.NO_CONTENT -> Metrics.newMeter(classOf[MetricsFilter], namePrefix + "noContent", "responses", TimeUnit.SECONDS),
Status.BAD_REQUEST -> Metrics.newMeter(classOf[MetricsFilter], namePrefix + "badRequest", "responses", TimeUnit.SECONDS),
Status.NOT_FOUND -> Metrics.newMeter(classOf[MetricsFilter], namePrefix + "notFound", "responses", TimeUnit.SECONDS),
Status.INTERNAL_SERVER_ERROR -> Metrics.newMeter(classOf[MetricsFilter], namePrefix + "serverError", "responses", TimeUnit.SECONDS)
)

def requestsTimer: Timer = Metrics.newTimer(classOf[MetricsFilter], "requests", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)

def activeRequests: Counter = Metrics.newCounter(classOf[MetricsFilter], "activeRequests")

def otherStatuses: Meter = Metrics.newMeter(classOf[MetricsFilter], namePrefix + "other", "responses", TimeUnit.SECONDS)

override def apply(next: EssentialAction) = new EssentialAction {
override def apply(rh: RequestHeader) = {
val context = requestsTimer.time()

def logCompleted(result: SimpleResult): SimpleResult = {
activeRequests.dec()
context.stop()
statusCodes.getOrElse(result.header.status, otherStatuses).mark()
result
}

activeRequests.inc()
next(rh).map(logCompleted)
}
}
}

 

Result
Response time

engineering-1

 

 

 

 

 

 

The average latency of Play framework is lower in this experiment. Most importantly, there is no spike in the async one. This can be the combined results from async computation, Play framework, and Astyanax.

 

Thread churn

engineering-thread

 

 

 

 

 

 

The thread count in Play framework (async) is more consistent and lower than Spring framework (sync). That means it has the potential to handle more traffic.

Conclusion
We are pleased with the result of using async computation in this experiment. Both latency and thread churn were improved dramatically. We also found it very easy to setup and use Play framework. However, it is unfair to Spring framework because it’s using sync computation. We are planning to do another experiment with async computation in Spring framework so we can compare the two in fair manner. We are also planning to do a stress test experiment to understand the real potential of Play framework async programming.

Written by Edward Chu, Senior Software Engineer.

Follow Ed on Twitter @edwardchu521