<br />
<b>Deprecated</b>:  Creation of dynamic property wpdb::$categories is deprecated in <b>/home/garrens3/public_html/blog/wp-includes/wp-db.php</b> on line <b>760</b><br />
<br />
<b>Deprecated</b>:  Creation of dynamic property wpdb::$post2cat is deprecated in <b>/home/garrens3/public_html/blog/wp-includes/wp-db.php</b> on line <b>760</b><br />
<br />
<b>Deprecated</b>:  Creation of dynamic property wpdb::$link2cat is deprecated in <b>/home/garrens3/public_html/blog/wp-includes/wp-db.php</b> on line <b>760</b><br />
<br />
<b>Deprecated</b>:  Using ${var} in strings is deprecated, use {$var} instead in <b>/home/garrens3/public_html/blog/wp-includes/comment-template.php</b> on line <b>1747</b><br />
<br />
<b>Deprecated</b>:  Optional parameter $term_id declared before required parameter $meta_value is implicitly treated as a required parameter in <b>/home/garrens3/public_html/blog/wp-content/plugins/advanced-code-editor/advanced-code-editor.php</b> on line <b>1927</b><br />
<br />
<b>Deprecated</b>:  Optional parameter $term_id declared before required parameter $meta_value is implicitly treated as a required parameter in <b>/home/garrens3/public_html/blog/wp-content/plugins/advanced-code-editor/advanced-code-editor.php</b> on line <b>1941</b><br />
<br />
<b>Deprecated</b>:  Optional parameter $term_id declared before required parameter $meta_key is implicitly treated as a required parameter in <b>/home/garrens3/public_html/blog/wp-content/plugins/advanced-code-editor/advanced-code-editor.php</b> on line <b>1956</b><br />
<br />
<b>Deprecated</b>:  Optional parameter $term_id declared before required parameter $key is implicitly treated as a required parameter in <b>/home/garrens3/public_html/blog/wp-content/plugins/advanced-code-editor/advanced-code-editor.php</b> on line <b>1970</b><br />
<br />
<b>Deprecated</b>:  Automatic conversion of false to array is deprecated in <b>/home/garrens3/public_html/blog/wp-content/plugins/loginizer/init.php</b> on line <b>250</b><br />
<br />
<b>Deprecated</b>:  Automatic conversion of false to array is deprecated in <b>/home/garrens3/public_html/blog/wp-content/plugins/loginizer/init.php</b> on line <b>265</b><br />
<br />
<b>Deprecated</b>:  Creation of dynamic property WP_Block_Type::$skip_inner_blocks is deprecated in <b>/home/garrens3/public_html/blog/wp-includes/class-wp-block-type.php</b> on line <b>391</b><br />
<br />
<b>Deprecated</b>:  Creation of dynamic property WP_Block_Type::$skip_inner_blocks is deprecated in <b>/home/garrens3/public_html/blog/wp-includes/class-wp-block-type.php</b> on line <b>391</b><br />
{"id":188,"date":"2018-01-06T16:51:17","date_gmt":"2018-01-07T00:51:17","guid":{"rendered":"http:\/\/garrens.com\/blog\/?p=188"},"modified":"2018-03-02T20:48:53","modified_gmt":"2018-03-03T04:48:53","slug":"scaling-python-for-data-science-using-spark","status":"publish","type":"post","link":"https:\/\/garrens.com\/blog\/2018\/01\/06\/scaling-python-for-data-science-using-spark\/","title":{"rendered":"Scaling Python for Data Science using Spark"},"content":{"rendered":"<p><a href=\"http:\/\/garrens.com\/blog\/wp-content\/uploads\/2018\/01\/apache-spark.jpg\"><img loading=\"lazy\" class=\"alignnone wp-image-197\" src=\"http:\/\/garrens.com\/blog\/wp-content\/uploads\/2018\/01\/apache-spark.jpg\" alt=\"\" width=\"430\" height=\"279\" srcset=\"https:\/\/garrens.com\/blog\/wp-content\/uploads\/2018\/01\/apache-spark.jpg 967w, https:\/\/garrens.com\/blog\/wp-content\/uploads\/2018\/01\/apache-spark-300x195.jpg 300w, https:\/\/garrens.com\/blog\/wp-content\/uploads\/2018\/01\/apache-spark-768x498.jpg 768w\" sizes=\"(max-width: 430px) 100vw, 430px\" \/><\/a><\/p>\n<p>Python is the de facto language of Data Science &amp; Engineering. (IMHO R is grand for statisticians, but Python is for the rest of us.)<\/p>\n<p>As a prominent language in the field, it only makes sense that Apache Spark supports it with Python specific APIs. <strong>Spark makes it so easy to use Python<\/strong>\u00a0 <strong>that it can actually run slow (up to <span style=\"text-decoration: underline;\">30-40x slower<\/span> than it should) due to naive missteps<\/strong>.<\/p>\n<blockquote><p>Remember: Spark is written in Scala primarily to leverage JVM performance characteristics, but also has APIs in Python, Java, and R<\/p><\/blockquote>\n<p>Even with the great (and rapidly growing!) support for Python on Spark (henceforth PySpark), there are some critical topics that need to be addressed. Do any of these packages look familiar, Python devs?<\/p>\n<pre>pandas (aka \"pd\")\r\nnumpy (aka \"np\")\r\nscipy\r\nscikit-learn\r\nkeras\r\ntensorflow\r\nsqlalchemy<\/pre>\n<p>Thought so. \ud83d\ude42<\/p>\n<p>Python is fantastic for Initial and Exploratory Data Analysis (IDA\/EDA) plus training models and all sorts of other awesome stuff. However, it can&#8217;t scale to larger data sizes\/parameters due to 2 primary issues:<\/p>\n<p>1) it&#8217;s natively single-threaded* due to global interpreter lock (GIL) &#8211; out of scope for this post<\/p>\n<p>2) It is not distributed** to multiple machines, so even if it were multi-threaded, it is still bound by one machine&#8217;s resources (CPU cores + memory + I\/O + network)<\/p>\n<p>* yes, there are different implementations of Python (CPython, Jython, Pypy, etc) that may or may not have mult-threaded\/concurrency supported. We&#8217;re focusing on the default and predominantly used CPython implementation<\/p>\n<p>** It <em>can<\/em> be distributed with a hot [relatively] new package &#8220;<a href=\"https:\/\/pypi.python.org\/pypi\/dask\">dask<\/a>&#8221; that came out in 2014. We&#8217;re sticking with PySpark for this post, but the more you know&#8230; AWS does offer a single node (x1e.32xlarge) with 128 vCPUs and ~3.9TB of RAM. If you have to ask how much it costs, you can&#8217;t afford it. \ud83d\ude09<\/p>\n<h2><strong>No, don&#8217;t use RDDs; use [Spark] SQL!<br \/>\n<\/strong><\/h2>\n<p>When people think of Spark, one of the first things that comes to mind is the age-old RDD. It is after all the original data structure API that brought Spark to prominence. RDDs still underpin Spark, however <em>since version 2.0<\/em>, the <strong>preferred data structure API for Python has become <a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame\">DataFrames<\/a><\/strong> (corollary in Scala\/Java is the <a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/scala\/index.html#org.apache.spark.sql.Dataset\">Datasets<\/a> API)<\/p>\n<p><em>DataFrames<\/em> are under the [massive] umbrella that is &#8220;Spark SQL.&#8221; DataFrames are preferred to RDDs for two optimization-related reasons; the <a href=\"https:\/\/databricks.com\/blog\/2015\/04\/13\/deep-dive-into-spark-sqls-catalyst-optimizer.html\"><strong>Catalyst Optimizer <\/strong><\/a>and <strong><a href=\"https:\/\/databricks.com\/blog\/2015\/04\/28\/project-tungsten-bringing-spark-closer-to-bare-metal.html\">Tungsten<\/a>. <\/strong>The short explanation of what Catalyst and Tungsten do for DataFrames is automatically handling optimizations that were too low-level for RDDs. The higher level Spark SQL abstractions afforded these APIs with more introspection and greater execution flexibility, thus increasing <strong>performance for DataFrames substantially over RDDs<\/strong>. For PySpark users, the performance difference is <em>even greater<\/em> than for Scala\/Java Spark users, because actions done using PySpark SQL APIs directly (e.g. trimming a string, summing an aggregate, regex, etc) are actually executed in the JVM directly rather than in a Python subprocess like RDDs require.<\/p>\n<p>When you&#8217;re using DataFrames, be sure to use the baked-in Spark SQL functions (also compatible with hive functions), because they all run optimized in the JVM: <a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#module-pyspark.sql.functions\">https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#module-pyspark.sql.functions<\/a><\/p>\n<p>Moral of the story: <span style=\"text-decoration: underline;\">If you&#8217;re using RDDs (especially with PySpark!), evaluate and try to transition to DataFrames. If you&#8217;re not using RDDs yet, try DataFrames first.<\/span><\/p>\n<pre>Note: Dataset is the Scala and Java API that is <em>strongly typed.\r\n<\/em>DataFrame is literally an alias for Dataset[Row] in Scala\r\nDataFrame does not exist in Java (except as Dataset[Row])\r\n\r\n*DataFrame is the only Spark SQL Data Structure API for Python, \r\nbecause Python is dynamically typed.*<\/pre>\n<p>This section wouldn&#8217;t be complete without a silly [but truthful] benchmark:<\/p>\n<pre>%%time # RDD\r\nlines = sc.textFile(\"yellow_tripdata_2017-06.csv\")\r\nlineLengths = lines.map(lambda s: len(s))\r\ntotalLength = lineLengths.reduce(lambda a, b: a + b)\r\nprint(totalLength) # =&gt; 832591506\r\n\r\nCPU times: user 9.66 ms, sys: 2.99 ms, total: 12.6 ms\r\nWall time: <strong>7.7 s<\/strong>\r\n\r\n%%time # DataFrame\r\nlinesDF = spark.read.text(\"yellow_tripdata_2017-06.csv\")\r\nprint(\r\n  linesDF.selectExpr(\r\n    \"sum(length(value)) as slen\")\r\n  .collect()[0]['slen']\r\n) # =&gt; 832591506\r\n\r\nCPU times: user 4.76 ms, sys: 1.74 ms, total: 6.5 ms\r\nWall time: <strong>3.84 s<\/strong><\/pre>\n<h3>All hope is not lost on &#8220;strong typing&#8221;-like features for PySpark<\/h3>\n<h3>Python being dynamically typed is a double edged sword, because by being dynamically typed (therefore forced to only use DataFrames, not Datasets), PySpark DataFrames also natively support accessing variables\/columns by name directly. For example&#8230;<\/h3>\n<pre>linesDF = spark.read\r\n  .option(\"header\", \"true\")\r\n  .option(\"inferSchema\",\"true\")\r\n  .csv(\"yellow_tripdata_2017-06.csv\")\r\nlinesDF.agg(\r\n  pyspark.sql.functions.sum(\r\n    linesDF.passenger_count # &lt;---- HERE\r\n  )\r\n)\r\n.collect()[0]['sum(passenger_count)']<\/pre>\n<h2>I want to use [insert awesome package here] with PySpark<\/h2>\n<p>You probably <em>can,<\/em> but just because you <em>can <span style=\"text-decoration: underline;\">doesn&#8217;t mean you should.<\/span><\/em><\/p>\n<p>What I mean by that is when you start mixing Python native code (such as Pandas) with Spark, two bad things happen [usually]. Firstly, your performance nose dives, because as I alluded to briefly above,\u00a0 any code that runs natively in Python (as opposed to using the Spark SQL APIs) requires [de]serialization to\/from Python\/JVM and execution in Python subprocesses. More info <a href=\"https:\/\/www.slideshare.net\/SparkSummit\/getting-the-best-performance-with-pyspark\">here<\/a>, which is a bit old by Spark standards, but any Spark-related wisdom by Holden Karau is great.<\/p>\n<p>The Spark SQL DataFrame API only goes so far (it goes very far FWIW). So maybe you&#8217;re tempted to write a UDF (User Defined Function) to extend Spark&#8217;s functionality for your use case.<\/p>\n<p>Before you write a UDF that uses Python-specific APIs (not from PySpark), have a look at this simple example and its implications.<\/p>\n<h4>Example<\/h4>\n<p>We want to convert timestamps in a column to seconds since epoch and count the distinct outputs (yes, this part is purely for benchmarking).<\/p>\n<h5>Using UDFs with Python-specific APIs<\/h5>\n<pre>timestamp_to_epoch = F.udf(lambda t: int(t.strftime(\"%s\")))\r\n%%time\r\ndf.select(timestamp_to_epoch(df.tpep_pickup_datetime)).distinct().count()\r\nCPU times: user 44.8 ms, sys: 18.3 ms, total: 63.1 ms\r\nWall time: <span style=\"text-decoration: underline;\"><strong>10min 10s<\/strong><\/span>\r\nOut[46]: 2340959<\/pre>\n<h5>Using PySpark SQL APIs<\/h5>\n<pre>%%time\r\ndf.select(F.unix_timestamp(df.tpep_pickup_datetime)).distinct().count()\r\nCPU times: user 2.67 ms, sys: 1.15 ms, total: 3.82 ms\r\nWall time: <span style=\"text-decoration: underline;\"><strong>16.5 s<\/strong><\/span>\r\nOut[47]: 2340959<\/pre>\n<p>Yes, you read that right. <strong>PySpark SQL APIs are 30-40 <span style=\"text-decoration: underline;\">TIMES<\/span> FASTER (!!!)<\/strong><\/p>\n<p>UDF with Python-Specific APIs:<strong> 610 seconds<br \/>\n<\/strong>PySpark SQL API: <strong>17 seconds<\/strong><\/p>\n<p>Fortunately for you, Spark 2.3 solves the issue &#8211; partly &#8211; by introducing an impressive feature allowing vectorization of Python UDFS, which you can read about more <a href=\"https:\/\/databricks.com\/blog\/2017\/10\/30\/introducing-vectorized-udfs-for-pyspark.html\">here<\/a>.<strong> Vectorized UDFs for PySpark<\/strong> can again <strong>massively improve performance.<\/strong><\/p>\n<p>I&#8217;m a sucker for contrived benchmarks that make my case so clearly. \ud83d\ude00<\/p>\n<p>Here&#8217;s a na\u00efve way to build a Spark DataFrame using Pandas first. Note this not only takes over 4 minutes to run, it also uses ~3GB of memory (!!).<\/p>\n<pre>%%time\r\npdf = pd.read_table(\"yellow_tripdata_2017-06.csv\")\r\nspark.createDataFrame(pdf).count()\r\nCPU times: user 4min 2s, sys: 3.33 s, total: 4min 6s\r\nWall time: <strong>4min 10s<\/strong><\/pre>\n<p>On the contrary, the same line count using the Spark SQL APIs directly (5 seconds, negligible memory impact):<\/p>\n<pre>%%time\r\nspark.read.csv(\"yellow_tripdata_2017-06.csv\").count()\r\nCPU times: user 1.63 ms, sys: 1.15 ms, total: 2.77 ms\r\nWall time: <strong>4.57 s<\/strong><\/pre>\n<p>If you&#8217;re going to use Python-native packages with Spark, be mindful of the [unintended] consequences, particularly as it relates to your jobs&#8217; performance.<\/p>\n<h6>Special note about Backwards Compatibility<\/h6>\n<p>For minor version changes, such as 2.1 to 2.3, many APIs are backwards compatible. Major version changes, such as 1.6 to 2.0 should definitely anticipate backwards incompatible changes. Structured Streaming did go through some fairly significant changes from 2.0.0 to 2.2.0 when it transitioned from experimental to production-ready respectively.<\/p>\n<p><strong>Special Thanks<\/strong><\/p>\n<p>Thank you <a href=\"https:\/\/www.linkedin.com\/in\/wesleyhoffman1\">Wes Hoffman<\/a>, <a href=\"https:\/\/www.linkedin.com\/in\/nikki-haas-2b24b217\/\">Nikki Haas<\/a> and <a href=\"https:\/\/www.linkedin.com\/in\/eric-lambert-1888a924\/\">Eric Lambert<\/a> for your feedback and suggestions on relevant topics!<\/p>\n<p>Further reading:<\/p>\n<p>https:\/\/databricks.com\/blog\/2016\/07\/14\/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html<\/p>\n<p>https:\/\/databricks.com\/blog\/2017\/10\/30\/introducing-vectorized-udfs-for-pyspark.html<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Python is the de facto language of Data Science &amp; Engineering. (IMHO R is grand for statisticians, but Python is for the rest of us.) As a prominent language in the field, it only makes sense that Apache Spark supports it with Python specific APIs. Spark makes it so easy to use Python\u00a0 that it&hellip; <a href=\"https:\/\/garrens.com\/blog\/2018\/01\/06\/scaling-python-for-data-science-using-spark\/\" title=\"Read More\" class=\"read-more\">Continue reading<span class=\"meta-nav\">&rarr;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[22],"tags":[17,16,7,14,2],"_links":{"self":[{"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/posts\/188"}],"collection":[{"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/comments?post=188"}],"version-history":[{"count":8,"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/posts\/188\/revisions"}],"predecessor-version":[{"id":198,"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/posts\/188\/revisions\/198"}],"wp:attachment":[{"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/media?parent=188"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/categories?post=188"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/garrens.com\/blog\/wp-json\/wp\/v2\/tags?post=188"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}