Big data [Spark] and its small files problem

Often we log data in JSON, CSV or other text format to Amazon’s S3 as compressed files. This pattern is a) accessible and b) infinitely scalable by nature of being in S3 as common text files. However, there are some subtle but critical caveats that come with this pattern that can cause quite a bit of trouble. Here I’m going to discuss the “small files problem” that has existed in big data since Hadoop and MapReduce first came to prominence as well as advice on how to solve for it.

Consider the following spark command:

df = spark.read.json("s3://awesome-bucket/offers/2017/09/07/*")

That statement looks simple and innocuous enough. Tell spark to read in a day’s worth of JSON formatted files in awesome-bucket under the offers key “directory.”*

The “*” glob at the end of the path means we’ll be reading in the files in each hour ‘directory’, each of which contains over 30,000 files. In
total, there are 24 “directories” for a total of over 700,000 files and 72M rows, each file of which is ~30KB.

But… there’s a lot going on under the hood and it’s not good [for performance reasons].

1. S3 is not a file system
2. Meta data on tiny files
3. GZip compressed files
4. JSON

1. S3 is not a file system

Amazon’s Simple Storage Service (S3) is a “cloud-based object storage solution” where each ‘object’ is identified by a bucket and a key.
When you list a directory on your local computer (e.g. “ls /tmp” on *nix systems), the information returned about the directory and files is
returned immediately. Asking S3 to list all the files in a “directory” (hint: it’s not actually a directory), such as through “s3cmd ls s3://bucket
/path/to/files” returns results in at best seconds, possibly minutes. S3 is optimal when you have few large files but horrendous when you
have an army of tiny files, because more files means the listing process takes substantially longer.

2. Meta data on tiny files

Spark has to know exact path and how to open each and every file (e.g. s3://bucket/path/to/objects/object1.gz) even if you just pass a path
to a “directory” because ultimately that “directory” is just a collection of files (or “objects”). With an army of tiny files, this meta data gets
large, both in number of elements but also in terms of memory (100,000 records in a hashmap to denote location, compression type and
other meta data) is not lightweight. Add in the overhead of using S3 plus network latencies and it becomes more clear why this meta data
collection process takes a long time.

3. GZip Compressed Files

GZip compression is great. Sure there are other compression formats out there (e.g. brotli, lzo, bzip2, etc), few if any are as widespread and
accessible as GZip. But what GZip has in terms of market share, it sacrifices in more big data friendly features. GZip is not splittable, which
means the entire file must be processed by a single core/executor as a single partition. This is an anti-pattern in a tool like spark designed
explicitly around parallelism, because having to serially process a file is expensive. Small GZip files are actually better to process than larger
ones, because multiple cores can work on different small files at the same time and not sit idle as one or two cores do all the work. But
don’t be tricked into thinking “oh I’ll just have loads of small files” because as you saw above, loads of small files are far worse than just
about any alternative.

4. JSON

It’s slow; plain and simple. It allows for complex structures and data types as well as implicitly defined types, which all make it very
expensive to parse.

* Directory on S3 is a misnomer, because S3 is an object store where each object is a combination of a bucket (e.g. “awesome-bucket”) with
a specific key (e.g. “/path/to/objects/object1.gz”) that has the object’s contents as its value. Think of it like a key-value store where ”
s3://awesome-bucket/offers/2017/09/07/12/2017-09-07-12-59-0.9429827.gz” is the KEY and the gzipped contents of
that file are the value. The slashes after the bucket name (awesome-bucket) mean nothing to S3; they exist solely for the user’s
convenience. Treating them as if they denote true directories is a convenience feature Amazon and APIs offer.

What can we do about it?
Use file formats like Apache Parquet and ORC. If you need to work with (ideally converting in full) armies of small files, there are some
approaches you can use.
1) S3DistCP (Qubole calls it CloudDistCP)
2) Use scala with spark to take advantage of Scala and Spark’s unique parallel job submission. See below for an example.
3) Just wait. A long time.

Parallel Job Submission to Consolidate many “directories”

val hours = (0 to 23).map(h => "%02d".format(h)) // zero pad
hours.par.foreach(hour => {
spark.read.json("s3a://awesome-bucket/offers/2017/09/07/" + hour.toString +
"/*").write.repartition(16).parquet("s3://output")
})

If the code and explanation doesn’t have you convinced, see this chart which shows the outsized performance boost by using Parquet over
armies of tiny files. This chart is only looking at one hour’s worth of data and on local HDFS instead of S3, so it actually makes the small files
look better than they should. I expect a minimum of a 20x speedup by using optimal file formats.

For the exceptionally interested parties, here is the chunk of spark code that I believe handles collecting the meta data on input files:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L380 – note how it’s not parallelized, asynchronous or otherwise optimized for peak performance involving tiny files.

Spark File Format Showdown – CSV vs JSON vs Parquet

Apache Spark supports many different data sources, such as the ubiquitous Comma Separated Value (CSV) format and web API friendly JavaScript Object Notation (JSON) format. A common format used primarily for big data analytical purposes is Apache Parquet. Parquet is a fast columnar data format that you can read more about in two of my other posts: Real Time Big Data analytics: Parquet (and Spark) + bonus and Tips for using Apache Parquet with Spark 2.x

In this post we’re going to cover the attributes of using these 3 formats (CSV, JSON and Parquet) with Apache Spark.

Splittable (definition): Spark likes to split 1 single input file into multiple chunks (partitions to be precise) so that it [Spark] can work on many partitions at one time (re: concurrently).

* CSV is splittable when it is a raw, uncompressed file or using a splittable compression format such as BZIP2 or LZO (note: LZO needs to be indexed to be splittable!)

** JSON has the same conditions about splittability when compressed as CSV with one extra difference. When “wholeFile” option is set to true (re: SPARK-18352), JSON is NOT splittable.

CSV should generally be the fastest to write, JSON the easiest for a human to understand and Parquet the fastest to read.

CSV is the defacto standard of a lot of data and for fair reasons; it’s [relatively] easy to comprehend for both users and computers and made more accessible via Microsoft Excel.

JSON is the standard for communicating on the web. APIs and websites are constantly communicating using JSON because of its usability properties such as well-defined schemas.

Parquet is optimized for the Write Once Read Many (WORM) paradigm. It’s slow to write, but incredibly fast to read, especially when you’re only accessing a subset of the total columns. For use cases requiring operating on entire rows of data, a format like CSV, JSON or even AVRO should be used.

Code examples and explanations

CSV

Generic column names | all string types | lazily evaluated

scala> val df = spark.read.option("sep", "\t").csv("data.csv")
scala> df.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

Header-defined column names | all string types | lazily evaluated

scala> val df = spark.read.option("sep", "\t").option("header","true").csv("data.csv")
scala> df.printSchema
root
 |-- guid: string (nullable = true)
 |-- date: string (nullable = true)
 |-- alphanum: string (nullable = true)
 |-- name: string (nullable = true)

Header-defined column names | inferred types | EAGERLY evaluated (!!!)

scala> val df = spark.read.option("sep", "\t").option("header","true").option("inferSchema","true").csv("data.csv")
scala> df.printSchema
root
 |-- guid: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- alphanum: string (nullable = true)
 |-- name: string (nullable = true)

The eager evaluation of this version is critical to understand. In order to determine with certainty the proper data types to assign to each column, Spark has to READ AND PARSE THE ENTIRE DATASET. This can be a very high cost, especially when the number of files/rows/columns is large. It also does no processing while it’s inferring the schema, so if you thought it would be running your actual transformation code while it’s inferring the schema, sorry, it won’t. Spark has to therefore read your file(s) TWICE instead of ONCE.

JSON

Named columns | inferred types | EAGERLY evaluated
scala> val df = spark.read.json("data.json")
scala> df.printSchema
root
 |-- alphanum: string (nullable = true)
 |-- epoch_date: long (nullable = true)
 |-- guid: string (nullable = true)
 |-- name: string (nullable = true)

Like the eagerly evaluated (for schema inferencing) CSV above, JSON files are eagerly evaluated.

Parquet

Named Columns | Defined types | lazily evaluated
scala> val df = spark.read.parquet("data.parquet")
scala> df.printSchema
root
 |-- alphanum: string (nullable = true)
 |-- date: long (nullable = true)
 |-- guid: string (nullable = true)
 |-- name: string (nullable = true)

Unlike CSV and JSON, Parquet files are binary files that contain meta data about their contents, so without needing to read/parse the content of the file(s), Spark can just rely on the header/meta data inherent to Parquet to determine column names and data types.

 TL;DR Use Apache Parquet instead of CSV or JSON whenever possible, because it’s faster and better.

Using Spark Efficiently | Understanding Spark Event 7/29/17

This page is dedicated to resources related to the 7/29/17 Understanding Spark event presentation in Bellevue, WA.

Slides

Great [FREE!] resources on all things Spark:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
https://spark.apache.org/docs/latest/sql-programming-guide.html

Databricks was founded by the original creators of Spark and is currently the largest contributor to Apache Spark. As such, they are a phenomenal resource for information and services relating to Spark.

Datasets: https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
Catalyst: https://www.slideshare.net/databricks/a-deep-dive-into-spark-sqls-catalyst-optimizer-with-yin-huai
https://de.slideshare.net/SparkSummit/deep-dive-into-catalyst-apache-spark-20s-optimizer-63071120
Tungsten: https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
Matrix: https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Personally curated Examples:

Create mock typed object data
import org.apache.spark.sql.functions._

case class CountryGDP(countryCode : String, countryName : String, Year : String, gdp: Double, language : Option[String])
val objects = Seq[CountryGDP](
CountryGDP("USA", "'Murica", "2014", 17393103000000f, None),
CountryGDP("USA", "'Murica", "2015", 18036648000000f, None),
CountryGDP("USA", "'Murica", "2016", 18569100000000f, None),
CountryGDP("CHE", "Switzerland", "2014", 702705544908.583, None),
CountryGDP("CHE", "Switzerland", "2015", 670789928809.882, None),
CountryGDP("CHE", "Switzerland", "2016", 659827235193.83, None)
)

Strongly typed Datasets
val objectsDS = spark.createDataset(objects)

// typed objects are evaluated at compile time (great for development in IDEs!)
val countriesWithLanguages = objectsDS.map(o => {
val lang = o.countryCode match {
case "USA" => Some("English")
case "CHE" => Some("Schweizerdeutsch")
case _ => Some("Simlish")
}
o.copy(language = lang)
})

Creating DataFrame and using UDF to transform
val rowsDF = spark.createDataFrame(objects)

def getLang(countryCode: String): Option[String] = {
countryCode match {
case "USA" => Some("English")
case "CHE" => Some("Schweizerdeutsch")
case _ => Some("Simlish")
}
}
val gl = sqlContext.udf.register("getLang", getLang _)

// String-based lookups are evaluated at Runtime
val rowsDFWithLanguage = rowsDF.withColumn("language", gl($"countryCode"))

Event link: https://www.eventbrite.com/e/understanding-spark-tickets-35440866586#

Video recording is here: https://livestream.com/metis/events/7597562

Learn more about parquet

Switching between Scala and Python on Spark tips

Switching between Scala and Python on Spark is relatively straightforward, but there are a few differences that can cause some minor frustration. Here are some of the little things I’ve run into and how to adjust for them.

  • PySpark Shell does not support code completion (autocomplete) by default.

Why? PySpark uses the basic Python interpreter REPL, so you get the same REPL you’d get by calling python at the command line.

Fix: Use the iPython REPL by specifying the environment variable
PYSPARK_PYTHON=ipython3 before the pyspark command.

Before:
pyspark

After:
PYSPARK_PYTHON=ipython3 pyspark

  • val and var are not python keywords!

This is silly, but I catch myself trying to create variables in python regularly with the val df = spark.read... style.

Before:
>>> val df = spark.range(100)
File "", line 1
val df = spark.range(100)
^
SyntaxError: invalid syntax

After:
>>> df = spark.range(100)

  • It’s print not println

Just like the val/var conundrum, println is not a valid keyword in python, but print is!

Before:
In [5]: df.foreach(println)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-5-3d51e5dc3e2b> in <module>()
----> 1 df.foreach(println)

NameError: name ‘println’ is not defined

After:
In [6]: df.foreach(print)
Row(id=3)
Row(id=4)
Row(id=2)
Row(id=1)
Row(id=0)

  • All function calls need parentheses in Python

Yep, this is one of those frustrating gifts that just keeps on giving [pain].

Scala:
scala> df.groupBy("element").count.collect.foreach(println)
[bar,1]
[qux,1]
[foo,1]
[baz,1]

Python
Before:
In [15]: df.groupBy("element").count().foreach(print)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
in ()
----> 1 df.groupBy("element").count.collect.foreach(print)

AttributeError: ‘function’ object has no attribute ‘collect’

After:
In [17]: df = spark.createDataFrame([(1,"foo"), (2, "bar"), (3, "baz"), (4, "qux")]).toDF("time", "element")
In [18]: df.groupBy("element").count().foreach(print)
Row(element='bar', count=1)
Row(element='qux', count=1)
Row(element='foo', count=1)
Row(element='baz', count=1)

  • Quotes!

Python allows both single (‘) quotes and double (“) quotes for strings. Scala uses the single quote to denote more specific types.

Scala
scala> 'f
res7: Symbol = 'f

scala> 'f'
res6: Char = f

scala> 'foo'
<console>:1: error: unclosed character literal
'foo'

scala> "foo" == 'foo'
:1: error: unclosed character literal
"foo" == 'foo'

Python

In [19]: "foo" == 'foo'
Out[19]: True

Real Time Big Data analytics: Parquet (and Spark) + bonus

Apache Spark and Parquet (SParquet) are a match made in scalable data analytics and delivery heaven. Spark brings a wide ranging, powerful computing platform to the equation while Parquet offers a data format that is purpose-built for high-speed big data analytics. If this sounds like fluffy marketing talk, resist the temptation to close this tab, because what follows are substantial insights I’ve personally procured and am sharing here to help others get the most out of Parquet and Spark.

What is Parquet?

Parquet is a binary compressed columnar file format available to any project in the Hadoop ecosystem (and others outside it even). It’s a mouthful, but let’s break it down.

Binary means parquet files cannot be opened by typical text editors natively (sublime text*, vim, etc).

* My former colleague James Yu wrote a Sublime Text plugin you can find here to view parquet files.

Columnar means the data is stored as columns instead of rows as most traditional databases (MySQL, PostgreSQL, etc) and file formats (CSV, JSON, etc). This is going to be very important.

Compressed means the file footprint on disk (HDFS, S3, or local filesystem) is smaller than a typical raw uncompressed file. Parquet handles compression differently than traditional compression of a CSV file for example, but in a similar vein to Avro.

Now that the basic definition is out of the way, let’s get right to it.

How can Parquet help me?

Parquet is exceptionally good at high-speed big data analytics. It can store vast quantities of data and operate on it more quickly than many other solutions. Let’s say you have CSV files in S3 (20 TB and growing by 250GiB per day) and a use case that necessitates reporting on those files in a dashboard. A common approach to this problem is aggregating the CSV files down to a MySQL-friendly size, so that reports can be built on this aggregated data. However, this is limited in multiple ways:

  1. CSV is slow to parse because it requires reading all of the rows in the entire file, parsing each line’s columns.
  2. MySQL can only handle so much data, especially high dimensionality data where your users may want to pivot on many different attributes. Every pivot requirement is likely to be impossible to meet, so users must have their functionality restricted for the sake of tech limitations.
  3. Building many tables to support the various pivot requirements becomes onerous, because each table (and the database itself) has to be limited in both size and scope. This increases database storage costs and complexity.

If those limitations had you cringing, I’ve made my case well :). There is an alternative that utilizes SParquet…

  1. Process the CSV files into Parquet files (snappy or gzip compressed)
  2. Use Spark with those Parquet files to drive a powerful and scalable analytics solution

CSV File for Proof of Concept (PoC): NYC TLC Green Taxi for December 2016

The CSV file has 1,224,160 rows and 19 columns, coming in at 107MB uncompressed. Here’s the file schema (using header and inferSchema options in Spark 2.1.1):

 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: nullable = true)
 |-- fare_amount: nullable = true)
 |-- extra: nullable = true)
 |-- mta_tax: nullable = true)
 |-- tip_amount: nullable = true)
 |-- tolls_amount: nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: nullable = true)
 |-- total_amount: nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)

Uncompressed CSV of 107MB was reduced to 24MB (Snappy Parquet) and 19MB (GZIP Parquet). But the real power comes in once the data (now in parquet format) is accessed. Parquet is exceptionally fast when accessing specific columns, which is the opposite of row-based file formats, which thrive when accessing an entire row record. Here are simple SQL examples to show the differences:

--#1
--CSV will read the entire file row-by-row
--Parquet will dump the rows based on their column values
--Winner: Parquet (minimal; because of no parsing)
SELECT *
green_tlc

--#2
--CSV will read the entire file row-by-row
--filter the PULocation column to only ones containing 226 
--and output all rows/columns that match the filter criteria as the results
--Winner: Parquet (minimal; because of no parsing and push down filtering)
SELECT *
green_tlc
<span class="hljs-keyword">WHERE PULocation = 226

--#3
--Parquet will first find only the relevant "data blocks" based on the filter criteria
--and only aggregate the rows/columns that match the filter criteria
--Winner: Parquet (huge; because of no parsing and only specific columns)
SELECT PULocation, SUM(total_amount)
green_tlc
WHERE PULocation IN (77, 102, 107, 226)
GROUP BY PULocation

#3 above is a great example of where Parquet shines, because you’re using pushdown filtering, operating on only specific columns (the rest are ignored), and do not have to parse what you don’t care about (all the other columns/rows).

What implementation strategies can I use?

Some ideas:

  1. Spark with Parquet (SParquet) on Livy to be able to cache entire datasets/queries
  2. Bonus: Impala with Parquet-backed Hive tables (also Spark compatible) to get hyperfast results available via SQL queries

By now, you have hopefully learned that Parquet is a powerful data format that facilitates big data analytics at a scale far greater than many traditional limited approaches. Go forth and play with Parquet!

Here’s my blog post for specific optimization tips: http://garrens.com/blog/2017/04/08/getting-started-and-tips-for-using-apache-parquet-with-apache-spark-2-x/

Garren Staubli is a Big Data Engineer Consultant at Blueprint Consulting Services, and formerly a big data engineer at iSpot.TV.

 

Connecting Apache Spark to External Data sources (e.g. Redshift, S3, MySQL)

Pre-requisites

AWS S3

Hadoop AWS Jar

AWS Java SDK Jar

* Note: These AWS jars should not be necessary if you’re using Amazon EMR.

Amazon Redshift

JDBC Driver

Spark-Redshift package *

* The Spark-redshift package provided by Databricks is critical particularly if you wish to WRITE to Redshift, because it does bulk file operations instead of individual insert statements. If you’re only looking to READ from Redshift, this package may not be quite as helpful.

MySQL

MySQL JDBC Connector jar

Setting your password [relatively securely]

This is not extremely secure, but is much better than putting your password directly into code.

Use a properties file:

<code>echo "spark.jdbc.password=test_pass_prop" &gt; secret_credentials.properties
spark-submit --properties-file secret_credentials.properties</code>

Examples (in Scala unless otherwise noted)

S3 (using S3A)

<code class="bash plain">spark-shell --jars hadoop-aws-2.7.3.jar,aws-java-sdk-1.7.4.jar</code>
<code>spark.conf.set("fs.s3a.access.key", "&lt;ACCESS_KEY&gt;")
spark.conf.set("fs.s3a.secret.key", "&lt;SECRET_KEY&gt;")
val d = spark.read.parquet("s3a://parquet-lab/files")
d.select("device_id").distinct().count() // =&gt; 1337</code>

* On Amazon EMR, you may be able to skip the jars and key settings.
** Also, you may also want to try using the “s3” or “s3n” protocols if s3a doesn’t work.

MySQL

<code>spark-shell --jars mysql-connector-java-5.1.40-bin.jar
val properties = new java.util.Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
properties.put("url", "jdbc:mysql://mysql-host:3306")
properties.put("user", )
properties.put("password", spark.conf.get("spark.jdbc.password", "&lt;default_pass&gt;"))
// This will form a SQL query like "SELECT model_id, prediction, actual_value FROM ml_models WHERE utc_start_time BETWEEN '2017-03-31' AND '2017-04-02'"
// Using .limit(INT) will NOT work as you might expect - it will retrieve all the data first THEN limit when showing you
val models = spark.read.jdbc(properties.get("url").toString, "ml_models", Array("utc_start_time BETWEEN '2017-03-31' AND '2017-04-02'"), properties).select("model_id", "prediction", "actual_value")</code>

Redshift

Recommended approach using Databricks’ spark-redshift:

spark-shell --packages com.databricks:spark-redshift_2.11:3.0.0-preview1 --jars RedshiftJDBC42-1.2.1.1001.jar

Basic JDBC connection only:

spark-shell --jars RedshiftJDBC42-1.2.1.1001.jar
<code>
val properties = new java.util.Properties() 
properties.put("driver", "com.amazon.redshift.jdbc42.Driver") 
properties.put("url", "jdbc:redshift://redshift-host:5439/") 
properties.put("user", "&lt;username&gt;") properties.put("password",spark.conf.get("spark.jdbc.password", "&lt;default_pass&gt;")) 
val d_rs = spark.read.jdbc(properties.get("url").toString, "data_table", properties)</code>

Using the Databricks Redshift data source package – for Bulk Data WRITING to Redshift, use this package:

Reading from and writing to Redshift stages data [and doesn’t clean up after itself] in S3, so use object lifecycle management!

val devices = spark.read.format("com.databricks.spark.redshift").
option("forward_spark_s3_credentials", "true").
option("url", "jdbc:redshift://redshift-host:5439/?user=&lt;user&gt;&amp;password=&lt;password&gt;").
option("query", "SELECT * FROM devices").
option("tempdir", "s3://temporary-holding-bucket/").load()

Writing the dataframe to Redshift in the “public.temporary_devices” table:

devices_transformed.coalesce(64).write .format("com.databricks.spark.redshift") .option("forward_spark_s3_credentials", "true") .option("url", "jdbc:redshift://redshift-host:5439/?user=&amp;password=") .option("dbtable", "public.temporary_devices") .option("tempdir", "s3a://temporary-holding-bucket/") .option("tempformat", "CSV GZIP") // EXPERIMENTAL, but CSV is higher performance than AVRO for loading into redshift .mode("error") .save()

* Note: coalesce(64) is called to reduce the number of output files to the s3 staging directory, because renaming files from their temporary location in S3 can be slow. This S3Committer should help alleviate that issue.

Resources

http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/

Tips for using Apache Parquet with Spark 2.x

What is Apache Parquet?
It is a compressable binary columnar data format used in the hadoop ecosystem. We’ll talk about it primarily with relation to the Hadoop Distributed File System (HDFS) and Spark 2.x contexts.

What role does it fill?
It is a fast and efficient data format great for scalable big data analytics.

Optimization Tips

  • Aim for around 1GB parquet output files, but experiment with other sizes for your use case and cluster setup (source)
  • Ideally store on HDFS in file sizes of at least the HDFS block size (default 128MB)
  • Storing Parquet files on S3 is also possible (side note: use amazon athena, which charges based on data read if you want Presto SQL-like queries on demand at low cost)
  • Use snappy compression if storage space is not a concern due to it being splittable, but for what should be a relatively small performance hit but much better compression, use gzip (source)
  • Continue reading “Tips for using Apache Parquet with Spark 2.x”

    Runtime Stats for Functions | Python Decorator

    In a similar vein to my prior Python decorator metadata for functions (“meta_func” => github | PyPi | blog), this decorator is intended to help illuminate the number of calls and time taken per call aggregates.

    It will keep track of each function by its uniquely assigned python object identifier, the total number of function calls, total time taken for all calls to that function, and min, max and average time for the function calls.

    Sample usage:
    @runtime_stats()
    def self_mult(n):
    sleep(0.2)
    return n*n

    print(self_mult(10)) # => 100
    print(self_mult(7)) # => 49
    print(self_mult.get_func_runtime_stats()) # => {'total_time': 401.668, 'avg': 200.834, 'func_uid': 4302206808, 'func_name': 'self_mult', 'min': 200.445, 'max': 201.223, 'total_calls': 2}

    Replace CTRL-A in a file while in a screen session

    echo -e "\u0001” | cat -v
    # ^A

    cat -v 000001 | tr '^A' '\t' | head

    Inspiration: http://stackoverflow.com/questions/31460818/creating-a-ctrl-a-delimiter-file

    Note: Within the same day, this strategy both worked then failed. YMMV

    More reliable would be to get into a non screen session and do “ctrl-v then a”

    Split file by keys

    Files sometimes come in (whether via hadoop or other processes) as big globs of data with inter-related parts. Many times I want to process these globs concurrently but see my dilemma unfolding quickly. I could a) write the code to process it serially and be done with it in 1 hour or b) write code to process it concurrently and be done in 1.5 hours because the added overhead of verifying the output, thread safety, etc exceeds the processing time serially. This made me sad, because concurrent processes are awesome. But self-managed thread safe concurrent processes are even more awesome!

    I thought, what if I could split an input file on keys and group those similarly keyed lines into separate files for processing. Aha!

    So I naturally first tried finding existing solutions and to be honest, awk has a pretty killer one liner as noted here on Stack Overflow:

    awk '{ print &gt;&gt; $5 }' yourfile

    This one liner is likely great for many folks (especially when using only small files). But for me, awk threw an error due to having “too many open files” – again, sad face :(.

    So… I wrote my own python command line utility to take an input file and split it into any number of output files by unique keys in the file. So all your input is maintained, just in different files sorted/segregated on keys you provide.

    While my naming conventions may be lacking panache, they are at least clearly intentioned utilities. (But seriously, if you have a better name, I’m all ears)

    Without further ado, I give you
    split_file_by_key