Real-Time Decision Engine using Spark Structured Streaming + ML

Real-time decision making using ML/AI is the holy grail of customer-facing applications. It’s no longer a long-shot dream; it’s our new reality. The real-time decision engine leverages the latest features in Apache Spark 2.3, including stream-to-stream joins and Spark ML, to directly improve the customer experience. We will discuss the architecture at length, including data source features and technical intricacies, as well as model training and serving dynamics. Critically, real-time decision engines that directly affect customer experience require production-level SLAs and/or reliable fallbacks to avoid meltdowns.

These Slides were put together for Data Platforms 2018 presented by Qubole.

Using new PySpark 2.3 Vectorized Pandas UDFs: Lessons

Since Spark 2.3 was officially released 2/28/18, I wanted to check the performance of the new Vectorized Pandas UDFs using Apache Arrow.

Following up to my Scaling Python for Data Science using Spark post where I mentioned Spark 2.3 introducing Vectorized UDFs, I’m using the same Data (from NYC yellow cabs) with this code:

from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd

df =\
  .option("header", "true")\
  .option("inferSchema", "true")\

def timestamp_to_epoch(t):
 return t.dt.strftime("%s").apply(str) # <-- pandas.Series calls

f_timestamp_copy = pandas_udf(timestamp_to_epoch, returnType=StringType())
df = df.withColumn("timestamp_copy", f_timestamp_copy(F.col("tpep_pickup_datetime")))'timestamp_copy').distinct().count()

# s = pd.Series({'ds': pd.Timestamp('2018-03-03 04:31:19')})
# timestamp_to_epoch(s)
## ds 1520080279
## dtype: object

Pandas Scalar (the default; as opposed to grouped map) UDFs operate on pandas.Series objects for both input and output, hence the .dt call chain as opposed to directly calling strftime on a python datetime object. The entire functionality is dependent on using PyArrow (>= 0.8.0).

Expect errors to crop up as this functionality is new. I have seen a fair share of memory leaks and casting errors causing my jobs to fail during testing.

Running the job above shows some new items in the Spark UI (DAG) and explain plan:

Note the addition of ArrowEvalPython

What’s the performance like?!

To jog your memory, PySpark SQL took 17 seconds to count the distinct epoch timestamps, and regular Python UDFs took over 10 minutes (610 seconds).

Much to my dismay, the performance of my contrived test was in line with Python UDFs, not Spark SQL with a runtime of 9-10 minutes.

I’ll update this post [hopefully] as I get more information.

PySpark ML + NLP Workshop


1. Explore Amazon reviews

2. Sentimentalize the reviews

3. Word frequency by helpfulness

Workshop Resources

Azure Notebooks Library

Sentiment Notebook

Commoners Notebook

More information

Datasets  | Amazon reviews for NLP | +/- Effect Lexicon

Packages | Spark Package for NLP | Spark ML guide – focus on DataFrame based, NOT RDD-based



Intro to PySpark Workshop 2018-01-24

In this Intro to PySpark Workshop, there are five main points:

  1. About Apache Spark
  2. Sample PySpark Application walkthrough with explanations
  3. Custom built Jupyter Azure Notebook to interactively demonstrate fundamental PySpark concepts
  4. Python-specific Spark advice
  5. Curated resources to learn more


PDF Version: Intro to PySpark Workshop

Q&A Options:

Twitter: #PySparkWorkshop

Sample app

from pyspark.sql import SparkSession
# Build SparkSession, gateway to everything Spark 2.x
spark = SparkSession.builder.appName(name="PySpark Intro").master("local[*]").getOrCreate()

# Create PySpark SQL DataFrame from CSV 
# inferring schema from file
# and using header
green_trips =\
    .option("header", "true")\
    .option("inferSchema", "false")\

# Create a view to use as if it were a SQL table

# Run arbitrary SQL to view total revenue by hour
revenue_by_hour = spark.sql("""
SELECT hour(lpep_pickup_datetime), SUM(total_amount) AS total
FROM green_trips
GROUP BY hour(lpep_pickup_datetime)
ORDER BY hour(lpep_pickup_datetime) ASC""")

# Write out to 25 files (because of 25 partitions) in a directory

This code can be put into a .py file and run using spark-submit at the command line:

> spark-submit

Resources to learn more

Advice for vetting Spark resources

Newer content is generally much more relevant due to the rapid pace that Apache Spark has been developed. Avoid most content before July 2016 which is when Spark 2.0 was released because it may not reflect many critical changes to Spark (such as Data structure APIs like DataFrames/Datasets, Structured Streaming, SparkSession, etc). Content that revolves around Spark 1.x (e.g. Spark 1.6.3) should be avoided as it’s effectively obsolete (ie: last release on 1.x line was Nov ’16 while Spark 2.x has had 6 releases since then). Databricks is essentially a commercial offshoot of the original project at UC Berkeley AMPLab, has Matei Zaharia, the original author of Spark as a co-founder, and employs the majority of Spark contributors. Basically, if Databricks says something about Spark, it would be a good idea to listen.


Learning PySpark (Feb 2017) by Tomasz Drabas and Denny Lee 

Gentle Introduction to Spark by Databricks

Mastering Apache Spark 2 by Jacek Laskowski – note this is more of a dense, incredibly useful reference than a tutorial or book meant to be read linearly

High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark by Holden Karau and Rachel Warren – highly recommended once you’re more comfortable with Spark

Articles and Blog Posts

Introducing Vectorized UDFs for PySpark by Databricks

Jump Start with Apache Spark 2.0 on Databricks by Databricks

Scaling Python for Data Science using Spark by Garren Staubli (me)


Intro to Apache Spark on Databricks by Databricks

Jupyter Azure Notebook by Garren Staubli (me)


Spark: The Definitive Guide (WIP) by Bill Chambers and Matei Zaharia (Databricks)


Extending Spark ML (+ 2nd video) by Holden Karau

Performance Optimization of Recommendation Training Pipeline at Netflix by DB Tsai

Free notebooks

Jupyter notebook in the Microsoft Azure cloud: Azure Notebooks

Databricks community edition

Docker image for Jupyter + PySpark