Switching between Scala and Python on Spark tips

Switching between Scala and Python on Spark is relatively straightforward, but there are a few differences that can cause some minor frustration. Here are some of the little things I’ve run into and how to adjust for them.

  • PySpark Shell does not support code completion (autocomplete) by default.

Why? PySpark uses the basic Python interpreter REPL, so you get the same REPL you’d get by calling python at the command line.

Fix: Use the iPython REPL by specifying the environment variable
PYSPARK_PYTHON=ipython3 before the pyspark command.

Before:
pyspark

After:
PYSPARK_PYTHON=ipython3 pyspark

  • val and var are not python keywords!

This is silly, but I catch myself trying to create variables in python regularly with the val df = spark.read... style.

Before:
>>> val df = spark.range(100)
File "", line 1
val df = spark.range(100)
^
SyntaxError: invalid syntax

After:
>>> df = spark.range(100)

  • It’s print not println

Just like the val/var conundrum, println is not a valid keyword in python, but print is!

Before:
In [5]: df.foreach(println)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-5-3d51e5dc3e2b> in <module>()
----> 1 df.foreach(println)

NameError: name ‘println’ is not defined

After:
In [6]: df.foreach(print)
Row(id=3)
Row(id=4)
Row(id=2)
Row(id=1)
Row(id=0)

  • All function calls need parentheses in Python

Yep, this is one of those frustrating gifts that just keeps on giving [pain].

Scala:
scala> df.groupBy("element").count.collect.foreach(println)
[bar,1]
[qux,1]
[foo,1]
[baz,1]

Python
Before:
In [15]: df.groupBy("element").count().foreach(print)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
in ()
----> 1 df.groupBy("element").count.collect.foreach(print)

AttributeError: ‘function’ object has no attribute ‘collect’

After:
In [17]: df = spark.createDataFrame([(1,"foo"), (2, "bar"), (3, "baz"), (4, "qux")]).toDF("time", "element")
In [18]: df.groupBy("element").count().foreach(print)
Row(element='bar', count=1)
Row(element='qux', count=1)
Row(element='foo', count=1)
Row(element='baz', count=1)

  • Quotes!

Python allows both single (‘) quotes and double (“) quotes for strings. Scala uses the single quote to denote more specific types.

Scala
scala> 'f
res7: Symbol = 'f

scala> 'f'
res6: Char = f

scala> 'foo'
<console>:1: error: unclosed character literal
'foo'

scala> "foo" == 'foo'
:1: error: unclosed character literal
"foo" == 'foo'

Python

In [19]: "foo" == 'foo'
Out[19]: True

Real Time Big Data analytics: Parquet (and Spark) + bonus

Apache Spark and Parquet (SParquet) are a match made in scalable data analytics and delivery heaven. Spark brings a wide ranging, powerful computing platform to the equation while Parquet offers a data format that is purpose-built for high-speed big data analytics. If this sounds like fluffy marketing talk, resist the temptation to close this tab, because what follows are substantial insights I’ve personally procured and am sharing here to help others get the most out of Parquet and Spark.

What is Parquet?

Parquet is a binary compressed columnar file format available to any project in the Hadoop ecosystem (and others outside it even). It’s a mouthful, but let’s break it down.

Binary means parquet files cannot be opened by typical text editors natively (sublime text*, vim, etc).

* My former colleague James Yu wrote a Sublime Text plugin you can find here to view parquet files.

Columnar means the data is stored as columns instead of rows as most traditional databases (MySQL, PostgreSQL, etc) and file formats (CSV, JSON, etc). This is going to be very important.

Compressed means the file footprint on disk (HDFS, S3, or local filesystem) is smaller than a typical raw uncompressed file. Parquet handles compression differently than traditional compression of a CSV file for example, but in a similar vein to Avro.

Now that the basic definition is out of the way, let’s get right to it.

How can Parquet help me?

Parquet is exceptionally good at high-speed big data analytics. It can store vast quantities of data and operate on it more quickly than many other solutions. Let’s say you have CSV files in S3 (20 TB and growing by 250GiB per day) and a use case that necessitates reporting on those files in a dashboard. A common approach to this problem is aggregating the CSV files down to a MySQL-friendly size, so that reports can be built on this aggregated data. However, this is limited in multiple ways:

  1. CSV is slow to parse because it requires reading all of the rows in the entire file, parsing each line’s columns.
  2. MySQL can only handle so much data, especially high dimensionality data where your users may want to pivot on many different attributes. Every pivot requirement is likely to be impossible to meet, so users must have their functionality restricted for the sake of tech limitations.
  3. Building many tables to support the various pivot requirements becomes onerous, because each table (and the database itself) has to be limited in both size and scope. This increases database storage costs and complexity.

If those limitations had you cringing, I’ve made my case well :). There is an alternative that utilizes SParquet…

  1. Process the CSV files into Parquet files (snappy or gzip compressed)
  2. Use Spark with those Parquet files to drive a powerful and scalable analytics solution

CSV File for Proof of Concept (PoC): NYC TLC Green Taxi for December 2016

The CSV file has 1,224,160 rows and 19 columns, coming in at 107MB uncompressed. Here’s the file schema (using header and inferSchema options in Spark 2.1.1):

 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: nullable = true)
 |-- fare_amount: nullable = true)
 |-- extra: nullable = true)
 |-- mta_tax: nullable = true)
 |-- tip_amount: nullable = true)
 |-- tolls_amount: nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: nullable = true)
 |-- total_amount: nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)

Uncompressed CSV of 107MB was reduced to 24MB (Snappy Parquet) and 19MB (GZIP Parquet). But the real power comes in once the data (now in parquet format) is accessed. Parquet is exceptionally fast when accessing specific columns, which is the opposite of row-based file formats, which thrive when accessing an entire row record. Here are simple SQL examples to show the differences:

--#1
--CSV will read the entire file row-by-row
--Parquet will dump the rows based on their column values
--Winner: Parquet (minimal; because of no parsing)
SELECT *
green_tlc

--#2
--CSV will read the entire file row-by-row
--filter the PULocation column to only ones containing 226 
--and output all rows/columns that match the filter criteria as the results
--Winner: Parquet (minimal; because of no parsing and push down filtering)
SELECT *
green_tlc
<span class="hljs-keyword">WHERE PULocation = 226

--#3
--Parquet will first find only the relevant "data blocks" based on the filter criteria
--and only aggregate the rows/columns that match the filter criteria
--Winner: Parquet (huge; because of no parsing and only specific columns)
SELECT PULocation, SUM(total_amount)
green_tlc
WHERE PULocation IN (77, 102, 107, 226)
GROUP BY PULocation

#3 above is a great example of where Parquet shines, because you’re using pushdown filtering, operating on only specific columns (the rest are ignored), and do not have to parse what you don’t care about (all the other columns/rows).

What implementation strategies can I use?

Some ideas:

  1. Spark with Parquet (SParquet) on Livy to be able to cache entire datasets/queries
  2. Bonus: Impala with Parquet-backed Hive tables (also Spark compatible) to get hyperfast results available via SQL queries

By now, you have hopefully learned that Parquet is a powerful data format that facilitates big data analytics at a scale far greater than many traditional limited approaches. Go forth and play with Parquet!

Here’s my blog post for specific optimization tips: http://garrens.com/blog/2017/04/08/getting-started-and-tips-for-using-apache-parquet-with-apache-spark-2-x/

Garren Staubli is a Big Data Engineer Consultant at Blueprint Consulting Services, and formerly a big data engineer at iSpot.TV.