There is a repo associated with this blog post here.

There is a benchmark associated with this blog post here.

Introduction

Ad impressions and digital marketing are the fuel that power the internet economy. It is hard to fathom, but every time you visit a web page, a live auction kicks off for your eyeballs and the right to display an ad impression is given to the highest bidder, in near real time. Since advertisers pay for these impressions, understanding and improving the effectiveness of these impressions and serving the ad before the user navigates away from the page is of prime importance to all the players in the digital marketing chain.

Serving an ad involves data stored in various repositories, starting with user profiles, the appropriate real time bids based on a profile, and ad networks that can serve those ads, to name just a few. Dealing with this data often involves working with a different systems/clusters or tools for each activity. These clusters usually have redundant data management, tuning, debugging and also constitute individual points of failure in a business that is extremely sensitive to latency and delays.

Advances in distributed computing and in memory data processing have made analytics on large/fast data volumes like the above much more feasible, and products like Hadoop and Spark have flourished as a result. About two years ago, the chimpler blog released a great example of real-time ad analytics code running on Spark Streaming and eventually storing its results in MongoDB. In this blog, we will show you an updated real-time ad analytics code example based on the chimpler blog, but utilizing SnappyData as the compute and storage engine.

Our example showcases both ease of use and speed as it pertains to ingesting streaming data, registering continuous queries on that data, and ultimately storing that data in a distributed in memory store resident in Spark, for interactive querying. In particular, it shows:

- Simplicity of using SQL or the DataFrame API to model streams in Spark.
- The use of SQL/SchemaDStream API (as continuous queries) to pre-aggregate AdImpression logs.
- Storing the pre-aggregated logs into the SnappyData columnar store with high efficiency.
- Running OLAP queries from any SQL client both on the full data set as well as sampled data (showcasing sub-second interactive query speeds). The stratified sample allows us to manage an infinitely growing data set at a fraction of the cost otherwise required.

Use case description

We consider an ad network where ad servers log impressions in Apache Kafka (distributed publish-subscribe messaging system). These impressions are then aggregated by Spark Streaming into the SnappyData Store. External clients connect to the same cluster using JDBC/ODBC and run arbitrary OLAP queries. As ad servers could produce logs from many websites and given that each ad impression log message represents a single ad viewed by a user, you can expect thousands of messages every second. It is crucial that ingestion logic keeps up with the stream. To accomplish this, SnappyData collocates the store partitions with partitions created by Spark Streaming. i.e. a batch of data from the stream in each Spark executor is transformed into a compressed column batch and stored in the same JVM, avoiding redundant shuffles (except for HA).

A typical ad impression record comprises fields like publisher, advertiser, website, geo, bid, cookie, and timestamp:

We pre-aggregate these logs by publisher and geo, and compute the average bid, the number of impressions and the number of uniques (the number of unique users that viewed the ad) every 2 seconds. We want to maintain the last day’s worth of data in memory for interactive analytics from external clients. Some examples of interactive queries:

  • Find total uniques for a certain ad grouped on geography;
  • Impression trends for advertisers over time;
  • Top ads based on uniques count for each geography.

So the aggregation will look something like:

How is it implemented?

The logic for this use case has been implemented three different ways:

- Pure Spark Streaming from the Chimpler blog
- Using the Spark API with SnappyData extensions to work with the stream as a sequence of DataFrames
- Using pure SQL in SnappyData

The idea behind the latter two is to show that SnappyData can cater to developers that prefer the Spark way of writing code as well as developers that prefer the SQL way. The code examples provided below will show the pure SQL way, but if the Spark way is preferred, follow along by clicking the above link.

There is a screencast that shows how to run the example code below:

First, random ad impression logs are generated and fed into kafka. To see how they are generated, follow this link.

Stream tables and continuous query

Next, a SnappyData Stream Table is created over the kafka source. A Stream Table is an abstraction over a Spark DStream that looks like a DataFrame or SQL table to any client. Messages are converted to Row objects using a converter that makes them comply with the stream table’s schema. The stream table is registered with the SnappyData catalog so any 3rd party client can concurrently query it.

Next, a continuous query is registered on the stream table that is used to create the aggregations we spoke about above. The query aggregates metrics for each publisher and geo every 1 second. This query runs every time a batch is emitted. It returns a SchemaDStream.

Ingesting into a Column table

Next, we create a column table and ingest the aggregations produced by the continuous query into it. We then use the Spark Data Source API to write the aggregations into the column table. This will automatically localize the partitions in the datastore without shuffling data.

Ingesting into a Sample table

Finally, create a sample table that ingests from the column table specified above. This is the table that approximate queries will execute over. Here we create a query column set on the 'geo' column, specify how large of a sample we want relative to the column table (3%) and specify which table to ingest from:

Query the data

Now we implore the user to query the tables they just created. Using the snappy-shell, we execute simple OLAP SQL queries; we find:

  • The top 20 geographies with the most ad impressions and
  • The total uniques for a certain ad, grouped by geography.

We then show how easy it is to execute the same queries over the sample table. Here we use the exact same queries as above, but append “with error 0.20 confidence 0.95.” These clauses let SnappyData know that we can tolerate a 20% error rate with a 95% confidence interval and their inclusion automatically routes them to be executed over the sample table, despite the exact table being in the FROM clause. Tolerating that error rate, of course, means the queries can execute much faster.

In this example, the latency difference between the exact, column table and approximate, sample table won’t be significant because the data volumes are low. In a more common situation with growing data volumes, the sample table would deliver consistent, interactive speeds while the latency of the exact table would grow with data volumes.

To see the steps of the above, please visit this link.

Conclusion

Hopefully we showed you how simple yet flexible it is to parallely ingest, process using SQL, run continuous queries, store data in column and sample tables and interactively query data. All in a single unified cluster. We will soon release Part B of this exercise - a benchmark of this use case where we compare SnappyData to other alternatives. Coming soon.

Learn more and chat about SnappyData on any of our community channels:

Stackoverflow
Slack
Mailing List
Gitter
Reddit
JIRA

Other important links:

SnappyData source
SnappyData docs
Ad analytics source
SnappyData technical paper

The Apache Spark Database

SnappyData is Spark 2.0 compatible and open source. Download now