Prismatic Blog

How Prismatic deals with data storage and aggregation

Previously, we wrote about our software engineering principles and advocated building libraries of fine-grained abstractions over monolithic frameworks. Our first example will be Store, a library for storage and aggregation. Because there is quite a bit of data manipulation behind the scenes of Prismatic, we put a lot of effort into making Store very simple to use, from experiments at the REPL to well-tested production code.

Many of our data needs can be captured by a key-value abstraction. The core interface in Store is a bucket. A bucket stores [key,value] pairs; you can retrieve a value for a key with the get operation and write a value with put. We have bucket implementations including an in-memory hash map, Amazon S3, MongoDB, BDB, and SQL. Nearly all our code doesn't care which underlying storage engine is used since we code to the bucket interface when possible. One great benefit of this is we can write system-level tests by injecting memory-based buckets in place of the persistent ones used in production. Another benefit is that we can write generic code to host buckets over HTTP and instantly get network access to bucket data for new storage engine implementations. While we get a lot of convenience by wrapping the standard key-value operations into an abstraction, we need a richer abstraction to handle our data processing.

Much of the machine learning, ranking, and system monitoring we do, requires not just reading and writing values, but efficiently updating existing ones with new observations. A service for instance might be listening to new document events and observing new statistics about popular topics and entities. We want to aggregate, or merge, these observations in order to make system decisions downstream. While you can simulate updating a bucket updates by doing a get, modifying the value, and a put, you run into two issues: (1) race conditions when multiple threads are updating a given key, and (2) efficiency issues when get and put go over the network and all you needed to send was a smaller update message.

Because of this, our bucket abstraction also has a merge operation, which takes a key and an update-value and combines the existing value with the update value (if any). In the simple case where a bucket represents word counts, the merge operation is +. The merge function doesn't have to be commutative, but it should be associative. Buckets are constructed with merge functions to encode domain/application-specific logic about how that data is to be aggregated. Here's a Clojure example, where we are building a bucket of bigram word counts:

Stepping briefly through this example: The bigram bucket is an in-memory hash map bucket; behind the scenes this is implemented using the ConcurrentHashMap. The keys of the bucket are words and the values are Clojure maps representing the counts of words following the key word. For instance in the sentence, "Prismatic runs on coffee", the state of the bucket, represented as a map, should be:

The bucket is also constructed with a merge function, which in this example amounts to adding corresponding elements in the inner map value for the key word. In the case where the bucket is an in-memory bucket, we can efficiently implement bucket/merge with the .replace method of the ConcurrentHashMap. However, if the bigrams bucket is actually sitting on a remote machine, the merge operation can be implemented on the client-side via an HTTP POST of the update {new-word 1} without pulling down the existing word counts for word. Server-side, the bucket having the correct merge function can incorporate the update. Importantly, other than the bucket spec declared with the bucket, the rest of the code can remain agnostic if the bucket is remote.

But obviously, if bigrams was a remote bucket, we wouldn't want to do a separate HTTP POST for each [word,{next-word 1}] update; that would fall over pretty quickly. If we see the same word a bunch in the same text, we'd want to locally aggregate counts for a given word before we POST it to the remote bucket. We might even want to not send any updates over the wire until a sufficient amount of time has passed or an in-memory buffer has exceeded some threshold.

Having these kind of buffering policies for aggregating data is extremely common in sophisticated systems. For instance, the Lucene project does something very much like this, where indices are built in-memory and periodically merged to an on-disk index segment. However, this buffering and flushing logic is deeply tied to their specific problem and OOP class hierarchy. Using our bucket abstraction, we can write a generic library function which defines an in-memory flush-bucket which wraps a (presumably persistent) underlying bucket and uses its merge function. The flush-bucket can accumulate local merges and we can write an arbitrary policy to decide when to sync data to the underlying bucket. Since requiring this kind of flush buffering is so common, Store allows flush buckets on construction. So for instance, in our previous example, if we change the bucket declaration to:

Here, the bigrams bucket is a BDB bucket, but the bucket/merge calls above are done with an in-memory flush bucket and every 60 seconds the contents of this flush-bucket are merged with the underlying BDB bucket. We can reuse this buffering logic in our indexing, topic model code, and just about anywhere where we need this mechanism. The bucket abstractions lets you de-couple the buffering policy from the domain-specific merging/aggregation logic. In other words, because a bucket knows how to merge its own data as part of its specification, it's substantially easier to write library code for generic buffering strategies. Having merge as part of the bucket abstraction is in turn straightforward because Clojure's built-in immutable data structures and core sequence operations make it easy to express merge operations.

We can use buckets to build batch MapReduce-like functionality relatively-straightforward. But crucially, we can re-use bucket aggregation policies to do things like streaming MapReduce. Here's what in-memory batch MapReduce looks like using bucket (for the streaming case see our ClojureWest slides):

The merge function is literally the reduce in MapReduce; in fact, much of Store are the abstractions used during reduce/aggregation part of a MapReduce implementation. Obviously, there are many critical pieces of a real MapReduce implementation missing here (side aggregation, distributed buckets, checkpointing, error tolerance, etc.). Many of these features/policies are useful throughout systems programming outside of the context of MapReduce. Using the bucket abstraction we have built many of these features as part of a library of aggregation processing, rather than being tied to a specific framework implementation. For instance, we can think of check-pointing as a flush-bucket which periodically 'snapshots' its state to a on-disk bucket and loads from that on-disk bucket on load in case of service interruption/restart.

Join the discussion on HackerNews.