One of Databricks’ most well-known blogs is the blog where they describe joining a billion rows in a second on a laptop. Since this is a fairly easy benchmark to replicate, we thought, why not try it on SnappyData and see what happens? We found that for joining two columns with a billion rows, SnappyData is nearly 20x faster. SnappyData is open source on github.

Let’s start with the benchmark as in the original post. The machine is a:

Dell Latitude E7450 laptop with Core(TM) i7-5600U CPU @ 2.60GHz having 16GB of RAM.

Start the SnappyData shell with some decent memory (required for the data load test):

(the GC options are similar to a default SnappyData cluster)

Define a simple benchmark util function

Let’s do a warmup first that will also initialize some Spark components and sum a billion numbers:

> Time taken in Spark 2.0 (sum of a billion): 0.70 seconds

Very impressive sub-second timing.

Let's try the same using SnappySession (SnappySession is entry point for SnappyData’s extensions).

> Time taken in Spark 2.0 (sum of a billion): 0.69 seconds

Similar numbers as expected. Now lets load these into a SnappyData column table and try again.

> Time taken in SnappyData (sum of a billion): 0.44 seconds

Somewhat faster than even direct evaluation. One might say "Oh but all you have to do is read longs from memory and add them.” Let’s try with Spark memory caching.

Drop the table first, because there is not enough memory to hold both on the laptop:

> Time taken in Spark 2.0 cache (sum of a billion): 7.5 seconds

Whoa, more than 15X slower.

Some technical folks may recognize this due to lack of vectorization in the Spark 2.0 memory caching mechanism. One may try Spark's vectorized parquet reader, but comparing that is not apples to apples even if the Parquet file is in the OS cache. Incidentally, the Parquet reader is actually quite a bit faster than Spark caching when compression is disabled and file is completely in the OS cache, again showing the power of vectorization and code generation.

Spark is lightning fast when joining a billion records.

> Time taken in Spark 2.0 (join): 0.63 seconds

It’s almost unbelievable that Spark can join at about the same speed as a simple sum. The trick lies in Spark's optimized implementation for single column join on integral types when the values are contiguous where it can use a "dense" array with upper and lower bounds instead of a full hashmap.

Let’s try the same with a SnappyData replicated table.

> Time taken in SnappyData (join): 0.64 seconds

Close enough. Perhaps SnappyData is also using a similar implementation. Let’s make it a bit more complex joining on two columns.

> Time taken in Spark 2.0 (two column join): 15.6 seconds

A whopping 25X drop in performance. Let’s try the same with SnappyData:

> Time taken in SnappyData (two column join): 0.81 seconds

Still sub-second and ~19X faster than Spark 2.0

The improvement seen in this particular case is due to a more generic approach to optimizing joins on contiguous values. While Spark uses a single column “dense” vector to optimize the single column join case, the SnappyData’s hash-join implementation uses per-column MIN/MAX tracking to quickly reject streamed rows if any of the join keys lie beyond the limits. Thus while Spark’s optimization works only for specialized single column cases, the approach in SnappyData works for a much wider range of queries. Beyond this specific optimization, the hash grouping and join operators in SnappyData are themselves tuned to work much better with its column store.

A more general case join will be an order of magnitude slower (or more depending on the hash map size) than the examples above. Let’s try out an example closer to real world. This example is a shortened version of NYSE TAQ having a “quote” table with some random values for BID and SYMBOL columns.

> Time taken in Spark (join and groupBy): 12.4 seconds

So the performance drops by more than two orders of magnitude for such a join and groupBy (the data size is 100M compared to 1 billion in the joins before). Much of the time is being spent in random value generation on-the-fly, so let’s try after caching the datasets:

> Time taken in Spark (join and groupBy): 6.6 seconds

Caching improves the performance quite a bit as was expected. How well does SnappyData do for such a query? Let’s find out:

> Time taken in SnappyData (join and groupBy): 0.49 seconds

This is ~13X faster than Spark cache. Beyond numbers, it brings many such queries on larger datasets and clusters into the realm of real-time analytics.

How does SnappyData achieve this performance gain?

Apache Spark’s optimizations are designed for disparate data sources which tend to be mostly external, such as HDFS or Alluxio. For better response times to queries on a non-changing data sets, Spark recommends caching data from external data sources as cached tables in Spark. Then, they recommend running the queries on these cached data structures where the data is stored in optimized column formats. While this dramatically improves performance, we found a number of areas for further improvements. For instance, a scan of columns managed as byte arrays is copied into an "UnsafeRow" object for each row, and then the column values are read from this row breaking vectorization and introducing lots of expensive copying.

Addressing these inefficiencies, however, is not that easy as the data in the column store may have been compressed using a number of different algorithms like dictionary encoding, run length encoding etc. SnappyData has implemented alternate decoders for these, so it can get the full benefit of code generation and vector processing.

Similar optimizations have also been applied to row table scans through the Spark Engine, thus, giving about 5X better query response times compared to pre 0.7 state of the product.

Some of the other significant optimizations include:

- SnappyData's storage layer now allows for collocation of partitioned tables. This information has been used to eliminate the most expensive portions of many joins (like shuffle) and, turned them into collocated one-to-one joins. This release significantly enhances the physical plan strategy phase to aggressively eliminate data shuffle and movement for many more cases.

- Support for plan caching to avoid query parsing, analysis, optimization, strategy and preparation phases.

- Spark has been changed to broadcast data with tasks themselves to significantly reduce task latencies. A set of tasks scheduled from the driver to an executor are grouped as a single TaskSet message with common task data sent only once instead of separate messages for each task. Task data is also compressed to further reduce network usage.

- An efficient pooled version of Kryo serializer has been added that is now used by default for data, closures and lower level netty messaging. This, together with improvements mentioned in the previous point, significantly reduce overall latency of short tasks (from close to 100ms down to a few ms). These also reduce the CPU consumed by the driver enhancing the concurrency of tasks, especially shorter ones.

- Enhancements in column level statistics allow for skipping column batches based on query predicates if possible. For example time range based queries will now be able to scan only the batches that fall in the said range and skip others, providing a huge boost to such queries on time-series data.

- Alternate hash aggregation and hash join operators have been added that have been finely optimized for SnappyData storage to make full use of storage layout with vectorization, dictionary encoding to provide an order of magnitude performance advantage over Spark's default implementations.

We are working on several other ideas on the performance front like updatable global and local indexes on column tables too to optimize join queries. Row tables already supports these indexes. Additionally, we are using WSCG to optimize several other areas like faster stream and batch ingests. Stay tuned for more performance improvements in the next release.

SnappyData is open source on github

Learn more and chat about SnappyData on any of our community channels

Stackoverflow
Slack
Mailing List
Gitter
Reddit
JIRA

Other important links

SnappyData code example
SnappyData screencasts
SnappyData twitter
SnappyData technical paper

The Spark Database

SnappyData is Spark 2.0 compatible and open source. Download now