Using Spark Efficiently | Understanding Spark Event 7/29/17

This page is dedicated to resources related to the 7/29/17 Understanding Spark event presentation in Bellevue, WA.

Slides

Great [FREE!] resources on all things Spark:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
https://spark.apache.org/docs/latest/sql-programming-guide.html

Databricks was founded by the original creators of Spark and is currently the largest contributor to Apache Spark. As such, they are a phenomenal resource for information and services relating to Spark.

Datasets: https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
Catalyst: https://www.slideshare.net/databricks/a-deep-dive-into-spark-sqls-catalyst-optimizer-with-yin-huai
https://de.slideshare.net/SparkSummit/deep-dive-into-catalyst-apache-spark-20s-optimizer-63071120
Tungsten: https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
Matrix: https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Personally curated Examples:

Create mock typed object data
import org.apache.spark.sql.functions._

case class CountryGDP(countryCode : String, countryName : String, Year : String, gdp: Double, language : Option[String])
val objects = Seq[CountryGDP](
CountryGDP("USA", "'Murica", "2014", 17393103000000f, None),
CountryGDP("USA", "'Murica", "2015", 18036648000000f, None),
CountryGDP("USA", "'Murica", "2016", 18569100000000f, None),
CountryGDP("CHE", "Switzerland", "2014", 702705544908.583, None),
CountryGDP("CHE", "Switzerland", "2015", 670789928809.882, None),
CountryGDP("CHE", "Switzerland", "2016", 659827235193.83, None)
)

Strongly typed Datasets
val objectsDS = spark.createDataset(objects)

// typed objects are evaluated at compile time (great for development in IDEs!)
val countriesWithLanguages = objectsDS.map(o => {
val lang = o.countryCode match {
case "USA" => Some("English")
case "CHE" => Some("Schweizerdeutsch")
case _ => Some("Simlish")
}
o.copy(language = lang)
})

Creating DataFrame and using UDF to transform
val rowsDF = spark.createDataFrame(objects)

def getLang(countryCode: String): Option[String] = {
countryCode match {
case "USA" => Some("English")
case "CHE" => Some("Schweizerdeutsch")
case _ => Some("Simlish")
}
}
val gl = sqlContext.udf.register("getLang", getLang _)

// String-based lookups are evaluated at Runtime
val rowsDFWithLanguage = rowsDF.withColumn("language", gl($"countryCode"))

Event link: https://www.eventbrite.com/e/understanding-spark-tickets-35440866586#

Video recording is here: https://livestream.com/metis/events/7597562

Learn more about parquet

Switching between Scala and Python on Spark tips

Switching between Scala and Python on Spark is relatively straightforward, but there are a few differences that can cause some minor frustration. Here are some of the little things I’ve run into and how to adjust for them.

  • PySpark Shell does not support code completion (autocomplete) by default.

Why? PySpark uses the basic Python interpreter REPL, so you get the same REPL you’d get by calling python at the command line.

Fix: Use the iPython REPL by specifying the environment variable
PYSPARK_PYTHON=ipython3 before the pyspark command.

Before:
pyspark

After:
PYSPARK_PYTHON=ipython3 pyspark

  • val and var are not python keywords!

This is silly, but I catch myself trying to create variables in python regularly with the val df = spark.read... style.

Before:
>>> val df = spark.range(100)
File "", line 1
val df = spark.range(100)
^
SyntaxError: invalid syntax

After:
>>> df = spark.range(100)

  • It’s print not println

Just like the val/var conundrum, println is not a valid keyword in python, but print is!

Before:
In [5]: df.foreach(println)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-5-3d51e5dc3e2b> in <module>()
----> 1 df.foreach(println)

NameError: name ‘println’ is not defined

After:
In [6]: df.foreach(print)
Row(id=3)
Row(id=4)
Row(id=2)
Row(id=1)
Row(id=0)

  • All function calls need parentheses in Python

Yep, this is one of those frustrating gifts that just keeps on giving [pain].

Scala:
scala> df.groupBy("element").count.collect.foreach(println)
[bar,1]
[qux,1]
[foo,1]
[baz,1]

Python
Before:
In [15]: df.groupBy("element").count().foreach(print)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
in ()
----> 1 df.groupBy("element").count.collect.foreach(print)

AttributeError: ‘function’ object has no attribute ‘collect’

After:
In [17]: df = spark.createDataFrame([(1,"foo"), (2, "bar"), (3, "baz"), (4, "qux")]).toDF("time", "element")
In [18]: df.groupBy("element").count().foreach(print)
Row(element='bar', count=1)
Row(element='qux', count=1)
Row(element='foo', count=1)
Row(element='baz', count=1)

  • Quotes!

Python allows both single (‘) quotes and double (“) quotes for strings. Scala uses the single quote to denote more specific types.

Scala
scala> 'f
res7: Symbol = 'f

scala> 'f'
res6: Char = f

scala> 'foo'
<console>:1: error: unclosed character literal
'foo'

scala> "foo" == 'foo'
:1: error: unclosed character literal
"foo" == 'foo'

Python

In [19]: "foo" == 'foo'
Out[19]: True