At Ideas2IT’s ML services practice, we often encounter real time stream processing use cases with stringent performance requirements. In such situations, we constantly seek tool sets that can meet the requirements. This blog post describes one such requirement and comparison we did between solutions based on Go, Spark and plain vanilla Python. Here are our findings:

Context

Big data processing, as a field, has been receiving a lot of attention from developers in the past few years, and there are a number of great open source projects to show for it. Most of these projects expect the problem to be expressed as a Directed Acyclic Graph (DAG), with each vertex being a processing task. This works great for many types of data processing requirements, but increasingly we have been seeing use cases where this is not enough. One common scenario is where recent historical data of a certain user (from just a few seconds back) could affect the prediction for the current row.

We took one such use case and created separate implementations using Python, Spark and Go, and compared for performance, monitoring, ease of development, scalability and maintainability.

The Problem

We get json access logs for many users, the aim is to predict whether an event is an anomaly or not. To do this, we need to store some metadata about each user – we need aggregate information for all the locations that a user has logged in from, what the most recent cities are, recent login times, etc. This data is needed for the processing of every incoming row, and would get modified by every incoming row. Once we have retrieved the most recent user aggregates and also modified them with the new data, we do the actual prediction which is fairly straightforward and have no other dependencies.

Design possibilities

1. We could get the user aggregate retrieval and updation process to happen in a single thread. This would make sure user aggregates are always completely up to date, but would also prevent the program from scaling beyond a single core.

2. We could use a redis cluster/instance as storage for the user aggregate map, and send every update for each row to redis separately, and because redis has atomic increment and getset, there can be multiple processes updating the aggregates and there won’t be any data races. But as the number of messages per second increases to the tens of thousands, the aggregate map will start to lag behind the incoming logs by several seconds – because of network delays, all the copying required, and because of the redis cluster not being able to keep up with all the atomic update requests.

3. The third approach is to group incoming messages into user-wise 1-second batches, and send those batches to individual workers. Because each batch will only have logs from a single user, workers would only have to retrieve the aggregate data for that user only once for that entire batch. And the worker can collect aggregate delta for the batch, and send just a single update to redis.

Python

Infrastructure: 1 VM on on-premise server, with 4 cores at 3GHz and 8GB of RAM.

The logic and the model were designed and build in Python first, as a single threaded application. It used a dictionary with Pandas objects to store the user aggregate info, and the program was able to process around 60 input logs/sec. We introduced batching and multithreading using the multiprocessing library, but it was still not able to handle more than 400 logs/sec. Once we had the model ready, we spent less than a week building the first version in Python.

Spark

Infrastructure: 3 VMs on on-premise servers, each with 4 cores at 3GHz and 8GB of RAM.

We built two versions using Spark. In the first attempt, we built a system that distributes logs widely among workers, and for each incoming log, workers fetch that user’s data from Redis then update 6 fields for each incoming log. That’s a lot of redis hits. Keeping the user aggregate maps upto date was the bottle neck, so we used 1 VM for the prediction and 2 for the user aggregate management. When then redis updates were asynchronous, we were able to get a throughput of 10,000 logs/sec, but the user aggregate map was always lagging behind the latest logs by more than 20 seconds.

The throughput from the above method was good enough, but the the user aggregate lag was too bad. So in the second attempt, instead of sending every single update to redis, for every 2-second batch that comes from Spark’s StreamingContext, we aggregate and send only one update to redis, for each unique user in the batch. We were still using 2 machines for user aggregate management, and one for the predictions. The throughput was still at 10,000 logs/sec, but the user aggregate map’s lag came down to around 5 seconds.

Golang

Infrastructure: 1 VMs on on-premise servers, each with 4 cores at 3GHz and 8GB of RAM.

Spark provides a lot of features, but when you have a problem that does not fit perfectly into its programming model, we have to do clever things to work around the restrictions, and that usually leads to reduced performance.

Go is a modern programming language designed with concurrency and performance in mind, and it is easy to build concurrent data processing pipelines with just language constructs.

First we built a single multithreaded application, that has a batchManager that spins off new batchers as messages for new users start to appear, and kill batchers that have not received any messages in a while. It also has a location manager, a user manager, which provide single threaded access to maps that store location data or user aggregate data, and batch processors can request and receive back data from the data manager threads using channels. This version is able to process 20,000 logs/sec and keep the user aggregate map’s lag to below 1 second all the time, and double that with same lag, on an 8 core machine. We spent a little over a week to build this, and another week later, it was deployed to production. It has undergone a lot of changes and enhancements, but this is still the production version.

We also have a distributed version, which is just some distributed code added on top of the first version. We use Hashicorp’s memberlist[1] and raft[2] implementations to create a cluster that can maintain itself. The master node elected by raft gives leases to machines on which node should process which user, and other nodes that received that user’s data would parse the json messages, put them in a struct and forward them to the other nodes using a binary serialization library (gob[3]). The machine with the lease pulls user aggregate data from redis, maintains it, and sends a single copy to redis as backup, once every 30 seconds, or when the lease expires.

With 3 quad-core VMs receiving data from a kafka topic with 3 partitions, the clustered setup is able to process around 50,000 logs/sec, and the user aggregate map still has a 1 second lag at most. And this was built in another week of design and development over the non-distributed version.

Summary

The custom solution in Go was clearly much better than Spark in performance, and it could still be developed in around 2 weeks against Spark solution’s 1 week.

The Go program was designed from the ground up. Go has just enough features to accelerate development, but because it is a full programming language, there is still a lot of flexibility on how exactly you want your solution to work, unlike Spark where your design has to exactly fit Spark’s model. Go has green threads, communication and synchronization between threads using channels, clustering and leader elections through hashicorp’s memberlist and raft libraries, fast binary serialization/deserialization through gob, protobuf, flatbuffers, etc. Designing distributed systems, or even thinking about them in general needs some getting used to, but once we are past that stage, Go provides the best balance between performance and development time.

We are also in the process of building a minimal generic (code generation based) framework in Go that provides even more glue code for message passing, key based leases for specific machines to get all messages with a specific key, etc. We will keep writing as we make progress.

[1] https://github.com/hashicorp/memberlist
[2] https://github.com/hashicorp/raft
[3] https://blog.golang.org/gobs-of-data

Have something to add to the conversation? We’re all ears!

Leave a Reply