Recently, a user wrote into the Spark Mailing List asking about how to refresh data in a Spark DataFrame without reloading the application. The user stated:
“We have a Structured Streaming application that gets [credit card] accounts from Kafka into a streaming data frame. We have a blacklist of accounts stored in S3 and we want to filter out all the accounts that are blacklisted. So, we are loading the blacklisted accounts into a batch data frame and joining it with the streaming data frame to filter out the bad accounts. ... we wanted to cache the blacklist data frame to prevent going out to S3 everytime. Since, the blacklist might change, we want to be able to refresh the cache at a cadence, without restarting the whole app.”
This application makes perfect sense. A credit card issuer is liable for charges made on cards that are stolen, misplaced or otherwise misused. In 2012, unauthorized/fraudulent credit card transactions cost banks $6.1b dollars. It is in the credit card issuer’s interest to ensure that transactions involving a black listed card are caught right after the card has been flagged.
The definitive reply to his email came later in the thread. It stated:
“Yes, you will have to recreate the streaming Dataframe along with the static Dataframe, and restart the query. There isnt a currently feasible to do this without a query restart. But restarting a query WITHOUT restarting the whole application + spark cluster, is reasonably fast. If your application can tolerate 10 second latencies, then stopping and restarting a query within the same Spark application is a reasonable solution.”
In short, a streaming DataFrame collects credit card transactions coming in over a messaging system like Apache Kafka. Existing blacklisted credit cards are loaded from a datastore (in this case, S3) into a static DataFrame in Spark’s cache. Spark SQL is written to join the streaming DataFrame with the static DataFrame and detect any incoming blacklisted cards. This works great until a new blacklisted card is added to the datastore (S3). Now the DataFrame containing the blacklisted cards must be reloaded from S3. During the reload, the streaming DataFrame must be stopped and then restarted while the static DataFrame reloads. As stated in the reply, this can take 10 seconds or longer, depending on data volumes. In that time frame, transactions could be occurring on that blacklisted card since it has not been recognized yet. Solving the problem of recognizing black listed credit card accounts in a stream of credit cards accounts is impossible to do in real-time in a pure Spark solution.
So how can we use mutable DataFrames to make this solution faster and turn this into a continuous application?
Using SnappyData, we can simplify the process of adding new blacklisted cards to the blacklist DataFrame and greatly improve latency. We will follow the below overview with code snippets:
Step 1: Define a streaming DataFrame that represents incoming credit card transactions
Step 2: Define a DataFrame that represents the blacklisted cards table. Note this DF will be Mutable.
Step 3: Join the streaming DF with the blacklist DF and remove fraudulent cards. Write those transactions out to a rejected transactions table. Write the safe transactions to their own table.
Step 4: (optional): Define a machine learning job using Spark MLLib that operates on the rejected transactions table to train models to detect fraud automatically.
So what does this look like in code?
Below we create the SnappySession object and use it to create a mutable row table for the blacklisted cards and then a column table for the rejected transactions. These tables are immediately materialized as DataFrames.
Next we create a SnappyStreamingSession object to handle the streaming transactions then build the Kafka stream. We then create an object for each message in the Kafka stream. Finally we set up a separate stream for the blacklisted cards. In our example, we’re building the blacklist DF from a kafka stream as opposed to S3 for simplicity. This does not change the what is going on with the join between DataFrames and the blacklist DF is being continually updated by Kafka.
Next we insert the new blacklisted cards into the blackList table and join the main transaction stream with the blackList table to discover fraudulent transactions and write them into the rejected transactions table.
Finally, we write the valid transactions into a parquet file
In the above code, we’ve shown how simple it is to have our blacklisted cards DataFrame be continually updated with new data (in our case, using a kafta stream) while maintaining a join to a streaming DataFrame of transactions without restarting or reloading. Because of this mutability, SnappyData can speed up the 10 second latency incurred by vanilla Spark every time the Spark SQL join needs to be restarted. Further, because of our 20x speedup over Spark’s native cache, the performance of the above application will be even faster.
The ability to combine database like mutability (and reliability) into Apache Spark provides users with a single platform that can handle stream processing, SQL querying and Machine Learning all within the comforts of a very familiar Spark API, and all on live data.
A data platform built for the big data era should offer reliability, security, elasticity, scale and latency, and democratize the ability for users to build live applications that are reactive, responsive and offer true predictive analytics. SnappyData offers that to users today. You can download it here.
Shoutout to Sushma Goel for writing the code.