PySpark ML + NLP Workshop

Objectives:

1. Explore Amazon reviews

2. Sentimentalize the reviews

3. Word frequency by helpfulness

Workshop Resources

Azure Notebooks Library

Sentiment Notebook

Commoners Notebook

More information

Datasets

http://jmcauley.ucsd.edu/data/amazon/  | Amazon reviews for NLP

http://mpqa.cs.pitt.edu/lexicons/effect_lexicon/ | +/- Effect Lexicon

Packages

http://nlp.johnsnowlabs.com/ | Spark Package for NLP

https://spark.apache.org/docs/latest/ml-guide.html | Spark ML guide – focus on DataFrame based, NOT RDD-based

 

 

Intro to PySpark Workshop 2018-01-24

In this Intro to PySpark Workshop, there are five main points:

  1. About Apache Spark
  2. Sample PySpark Application walkthrough with explanations
  3. Custom built Jupyter Azure Notebook to interactively demonstrate fundamental PySpark concepts
  4. Python-specific Spark advice
  5. Curated resources to learn more

Slides

PDF Version: Intro to PySpark Workshop

Q&A Options:

Twitter: #PySparkWorkshop

Sample app

from pyspark.sql import SparkSession
# Build SparkSession, gateway to everything Spark 2.x
spark = SparkSession.builder.appName(name="PySpark Intro").master("local[*]").getOrCreate()

# Create PySpark SQL DataFrame from CSV 
# inferring schema from file
# and using header
green_trips = spark.read\
    .option("header", "true")\
    .option("inferSchema", "false")\
    .csv("green_tripdata_2017-06.csv")

# Create a view to use as if it were a SQL table
green_trips.createOrReplaceTempView("green_trips")

# Run arbitrary SQL to view total revenue by hour
revenue_by_hour = spark.sql("""
SELECT hour(lpep_pickup_datetime), SUM(total_amount) AS total
FROM green_trips
GROUP BY hour(lpep_pickup_datetime)
ORDER BY hour(lpep_pickup_datetime) ASC""")

# Write out to 25 files (because of 25 partitions) in a directory
revenue_by_hour.write.mode("overwrite").csv("green_revenue_by_hour")

This code can be put into a .py file and run using spark-submit at the command line:

> spark-submit sample_app.py

UPDATE: The content for this workshop was live streamed and recorded for PyLadies Remote which can be viewed here

Resources to learn more

Advice for vetting Spark resources

Newer content is generally much more relevant due to the rapid pace that Apache Spark has been developed. Avoid most content before July 2016 which is when Spark 2.0 was released because it may not reflect many critical changes to Spark (such as Data structure APIs like DataFrames/Datasets, Structured Streaming, SparkSession, etc). Content that revolves around Spark 1.x (e.g. Spark 1.6.3) should be avoided as it’s effectively obsolete (ie: last release on 1.x line was Nov ’16 while Spark 2.x has had 6 releases since then). Databricks is essentially a commercial offshoot of the original project at UC Berkeley AMPLab, has Matei Zaharia, the original author of Spark as a co-founder, and employs the majority of Spark contributors. Basically, if Databricks says something about Spark, it would be a good idea to listen.

Books

Learning PySpark (Feb 2017) by Tomasz Drabas and Denny Lee 

Gentle Introduction to Spark by Databricks

Mastering Apache Spark 2 by Jacek Laskowski – note this is more of a dense, incredibly useful reference than a tutorial or book meant to be read linearly

High Performance Spark: Best Practices for Scaling and Optimizing Apache Spark by Holden Karau and Rachel Warren – highly recommended once you’re more comfortable with Spark

Articles and Blog Posts

Introducing Vectorized UDFs for PySpark by Databricks

Jump Start with Apache Spark 2.0 on Databricks by Databricks

Scaling Python for Data Science using Spark by Garren Staubli (me)

Notebooks

Intro to Apache Spark on Databricks by Databricks

Jupyter Azure Notebook by Garren Staubli (me)

Repositories

Spark: The Definitive Guide (WIP) by Bill Chambers and Matei Zaharia (Databricks)

Presentations

Extending Spark ML (+ 2nd video) by Holden Karau

Performance Optimization of Recommendation Training Pipeline at Netflix by DB Tsai

Free notebooks

Jupyter notebook in the Microsoft Azure cloud: Azure Notebooks

Databricks community edition

Docker image for Jupyter + PySpark

 

Scaling Python for Data Science using Spark

Python is the de facto language of Data Science & Engineering. (IMHO R is grand for statisticians, but Python is for the rest of us.)

As a prominent language in the field, it only makes sense that Apache Spark supports it with Python specific APIs. Spark makes it so easy to use Python  that it can actually run slow (up to 30-40x slower than it should) due to naive missteps.

Remember: Spark is written in Scala primarily to leverage JVM performance characteristics, but also has APIs in Python, Java, and R

Even with the great (and rapidly growing!) support for Python on Spark (henceforth PySpark), there are some critical topics that need to be addressed. Do any of these packages look familiar, Python devs?

pandas (aka "pd")
numpy (aka "np")
scipy
scikit-learn
keras
tensorflow
sqlalchemy

Thought so. 🙂

Python is fantastic for Initial and Exploratory Data Analysis (IDA/EDA) plus training models and all sorts of other awesome stuff. However, it can’t scale to larger data sizes/parameters due to 2 primary issues:

1) it’s natively single-threaded* due to global interpreter lock (GIL) – out of scope for this post

2) It is not distributed** to multiple machines, so even if it were multi-threaded, it is still bound by one machine’s resources (CPU cores + memory + I/O + network)

* yes, there are different implementations of Python (CPython, Jython, Pypy, etc) that may or may not have mult-threaded/concurrency supported. We’re focusing on the default and predominantly used CPython implementation

** It can be distributed with a hot [relatively] new package “dask” that came out in 2014. We’re sticking with PySpark for this post, but the more you know… AWS does offer a single node (x1e.32xlarge) with 128 vCPUs and ~3.9TB of RAM. If you have to ask how much it costs, you can’t afford it. 😉

No, don’t use RDDs; use [Spark] SQL!

When people think of Spark, one of the first things that comes to mind is the age-old RDD. It is after all the original data structure API that brought Spark to prominence. RDDs still underpin Spark, however since version 2.0, the preferred data structure API for Python has become DataFrames (corollary in Scala/Java is the Datasets API)

DataFrames are under the [massive] umbrella that is “Spark SQL.” DataFrames are preferred to RDDs for two optimization-related reasons; the Catalyst Optimizer and Tungsten. The short explanation of what Catalyst and Tungsten do for DataFrames is automatically handling optimizations that were too low-level for RDDs. The higher level Spark SQL abstractions afforded these APIs with more introspection and greater execution flexibility, thus increasing performance for DataFrames substantially over RDDs. For PySpark users, the performance difference is even greater than for Scala/Java Spark users, because actions done using PySpark SQL APIs directly (e.g. trimming a string, summing an aggregate, regex, etc) are actually executed in the JVM directly rather than in a Python subprocess like RDDs require.

When you’re using DataFrames, be sure to use the baked-in Spark SQL functions (also compatible with hive functions), because they all run optimized in the JVM: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

Moral of the story: If you’re using RDDs (especially with PySpark!), evaluate and try to transition to DataFrames. If you’re not using RDDs yet, try DataFrames first.

Note: Dataset is the Scala and Java API that is <em>strongly typed.
</em>DataFrame is literally an alias for Dataset[Row] in Scala
DataFrame does not exist in Java (except as Dataset[Row])

*DataFrame is the only Spark SQL Data Structure API for Python, 
because Python is dynamically typed.*

This section wouldn’t be complete without a silly [but truthful] benchmark:

%%time # RDD
lines = sc.textFile("yellow_tripdata_2017-06.csv")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength) # =&gt; 832591506

CPU times: user 9.66 ms, sys: 2.99 ms, total: 12.6 ms
Wall time: <strong>7.7 s</strong>

%%time # DataFrame
linesDF = spark.read.text("yellow_tripdata_2017-06.csv")
print(
  linesDF.selectExpr(
    "sum(length(value)) as slen")
  .collect()[0]['slen']
) # =&gt; 832591506

CPU times: user 4.76 ms, sys: 1.74 ms, total: 6.5 ms
Wall time: <strong>3.84 s</strong>

All hope is not lost on “strong typing”-like features for PySpark

Python being dynamically typed is a double edged sword, because by being dynamically typed (therefore forced to only use DataFrames, not Datasets), PySpark DataFrames also natively support accessing variables/columns by name directly. For example…

linesDF = spark.read
  .option("header", "true")
  .option("inferSchema","true")
  .csv("yellow_tripdata_2017-06.csv")
linesDF.agg(
  pyspark.sql.functions.sum(
    linesDF.passenger_count # &lt;---- HERE
  )
)
.collect()[0]['sum(passenger_count)']

I want to use [insert awesome package here] with PySpark

You probably can, but just because you can doesn’t mean you should.

What I mean by that is when you start mixing Python native code (such as Pandas) with Spark, two bad things happen [usually]. Firstly, your performance nose dives, because as I alluded to briefly above,  any code that runs natively in Python (as opposed to using the Spark SQL APIs) requires [de]serialization to/from Python/JVM and execution in Python subprocesses. More info here, which is a bit old by Spark standards, but any Spark-related wisdom by Holden Karau is great.

The Spark SQL DataFrame API only goes so far (it goes very far FWIW). So maybe you’re tempted to write a UDF (User Defined Function) to extend Spark’s functionality for your use case.

Before you write a UDF that uses Python-specific APIs (not from PySpark), have a look at this simple example and its implications.

Example

We want to convert timestamps in a column to seconds since epoch and count the distinct outputs (yes, this part is purely for benchmarking).

Using UDFs with Python-specific APIs
timestamp_to_epoch = F.udf(lambda t: int(t.strftime("%s")))
%%time
df.select(timestamp_to_epoch(df.tpep_pickup_datetime)).distinct().count()
CPU times: user 44.8 ms, sys: 18.3 ms, total: 63.1 ms
Wall time: <span style="text-decoration: underline;"><strong>10min 10s</strong></span>
Out[46]: 2340959
Using PySpark SQL APIs
%%time
df.select(F.unix_timestamp(df.tpep_pickup_datetime)).distinct().count()
CPU times: user 2.67 ms, sys: 1.15 ms, total: 3.82 ms
Wall time: <span style="text-decoration: underline;"><strong>16.5 s</strong></span>
Out[47]: 2340959

Yes, you read that right. PySpark SQL APIs are 30-40 TIMES FASTER (!!!)

UDF with Python-Specific APIs: 610 seconds
PySpark SQL API: 17 seconds

Fortunately for you, Spark 2.3 solves the issue – partly – by introducing an impressive feature allowing vectorization of Python UDFS, which you can read about more here. Vectorized UDFs for PySpark can again massively improve performance.

I’m a sucker for contrived benchmarks that make my case so clearly. 😀

Here’s a naïve way to build a Spark DataFrame using Pandas first. Note this not only takes over 4 minutes to run, it also uses ~3GB of memory (!!).

%%time
pdf = pd.read_table("yellow_tripdata_2017-06.csv")
spark.createDataFrame(pdf).count()
CPU times: user 4min 2s, sys: 3.33 s, total: 4min 6s
Wall time: <strong>4min 10s</strong>

On the contrary, the same line count using the Spark SQL APIs directly (5 seconds, negligible memory impact):

%%time
spark.read.csv("yellow_tripdata_2017-06.csv").count()
CPU times: user 1.63 ms, sys: 1.15 ms, total: 2.77 ms
Wall time: <strong>4.57 s</strong>

If you’re going to use Python-native packages with Spark, be mindful of the [unintended] consequences, particularly as it relates to your jobs’ performance.

Special note about Backwards Compatibility

For minor version changes, such as 2.1 to 2.3, many APIs are backwards compatible. Major version changes, such as 1.6 to 2.0 should definitely anticipate backwards incompatible changes. Structured Streaming did go through some fairly significant changes from 2.0.0 to 2.2.0 when it transitioned from experimental to production-ready respectively.

Special Thanks

Thank you Wes Hoffman, Nikki Haas and Eric Lambert for your feedback and suggestions on relevant topics!

Further reading:

A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets

Introducing Vectorized UDFs for PySpark

Big data [Spark] and its small files problem

Often we log data in JSON, CSV or other text format to Amazon’s S3 as compressed files. This pattern is a) accessible and b) infinitely scalable by nature of being in S3 as common text files. However, there are some subtle but critical caveats that come with this pattern that can cause quite a bit of trouble. Here I’m going to discuss the “small files problem” that has existed in big data since Hadoop and MapReduce first came to prominence as well as advice on how to solve for it.

Consider the following spark command:

df = spark.read.json("s3://awesome-bucket/offers/2017/09/07/*")

That statement looks simple and innocuous enough. Tell spark to read in a day’s worth of JSON formatted files in awesome-bucket under the offers key “directory.”*

The “*” glob at the end of the path means we’ll be reading in the files in each hour ‘directory’, each of which contains over 30,000 files. In
total, there are 24 “directories” for a total of over 700,000 files and 72M rows, each file of which is ~30KB.

But… there’s a lot going on under the hood and it’s not good [for performance reasons].

1. S3 is not a file system
2. Meta data on tiny files
3. GZip compressed files
4. JSON

1. S3 is not a file system

Amazon’s Simple Storage Service (S3) is a “cloud-based object storage solution” where each ‘object’ is identified by a bucket and a key.
When you list a directory on your local computer (e.g. “ls /tmp” on *nix systems), the information returned about the directory and files is
returned immediately. Asking S3 to list all the files in a “directory” (hint: it’s not actually a directory), such as through “s3cmd ls s3://bucket
/path/to/files” returns results in at best seconds, possibly minutes. S3 is optimal when you have few large files but horrendous when you
have an army of tiny files, because more files means the listing process takes substantially longer.

2. Meta data on tiny files

Spark has to know exact path and how to open each and every file (e.g. s3://bucket/path/to/objects/object1.gz) even if you just pass a path
to a “directory” because ultimately that “directory” is just a collection of files (or “objects”). With an army of tiny files, this meta data gets
large, both in number of elements but also in terms of memory (100,000 records in a hashmap to denote location, compression type and
other meta data) is not lightweight. Add in the overhead of using S3 plus network latencies and it becomes more clear why this meta data
collection process takes a long time.

3. GZip Compressed Files

GZip compression is great. Sure there are other compression formats out there (e.g. brotli, lzo, bzip2, etc), few if any are as widespread and
accessible as GZip. But what GZip has in terms of market share, it sacrifices in more big data friendly features. GZip is not splittable, which
means the entire file must be processed by a single core/executor as a single partition. This is an anti-pattern in a tool like spark designed
explicitly around parallelism, because having to serially process a file is expensive. Small GZip files are actually better to process than larger
ones, because multiple cores can work on different small files at the same time and not sit idle as one or two cores do all the work. But
don’t be tricked into thinking “oh I’ll just have loads of small files” because as you saw above, loads of small files are far worse than just
about any alternative.

4. JSON

It’s slow; plain and simple. It allows for complex structures and data types as well as implicitly defined types, which all make it very
expensive to parse.

* Directory on S3 is a misnomer, because S3 is an object store where each object is a combination of a bucket (e.g. “awesome-bucket”) with
a specific key (e.g. “/path/to/objects/object1.gz”) that has the object’s contents as its value. Think of it like a key-value store where “
s3://awesome-bucket/offers/2017/09/07/12/2017-09-07-12-59-0.9429827.gz” is the KEY and the gzipped contents of
that file are the value. The slashes after the bucket name (awesome-bucket) mean nothing to S3; they exist solely for the user’s
convenience. Treating them as if they denote true directories is a convenience feature Amazon and APIs offer.

What can we do about it?
Use file formats like Apache Parquet and ORC. If you need to work with (ideally converting in full) armies of small files, there are some
approaches you can use.
1) S3DistCP (Qubole calls it CloudDistCP)
2) Use scala with spark to take advantage of Scala and Spark’s unique parallel job submission. See below for an example.
3) Just wait. A long time.

Parallel Job Submission to Consolidate many “directories”

val hours = (0 to 23).map(h =&gt; "%02d".format(h)) // zero pad
hours.par.foreach(hour =&gt; {
spark.read.json("s3a://awesome-bucket/offers/2017/09/07/" + hour.toString +
"/*").write.repartition(16).parquet("s3://output")
})

If the code and explanation doesn’t have you convinced, see this chart which shows the outsized performance boost by using Parquet over
armies of tiny files. This chart is only looking at one hour’s worth of data and on local HDFS instead of S3, so it actually makes the small files
look better than they should. I expect a minimum of a 20x speedup by using optimal file formats.

For the exceptionally interested parties, here is the chunk of spark code that I believe handles collecting the meta data on input files:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L380 – note how it’s not parallelized, asynchronous or otherwise optimized for peak performance involving tiny files.