Big data [Spark] and its small files problem

Often we log data in JSON, CSV or other text format to Amazon’s S3 as compressed files. This pattern is a) accessible and b) infinitely scalable by nature of being in S3 as common text files. However, there are some subtle but critical caveats that come with this pattern that can cause quite a bit of trouble. Here I’m going to discuss the “small files problem” that has existed in big data since Hadoop and MapReduce first came to prominence as well as advice on how to solve for it.

Consider the following spark command:

df = spark.read.json("s3://awesome-bucket/offers/2017/09/07/*")

That statement looks simple and innocuous enough. Tell spark to read in a day’s worth of JSON formatted files in awesome-bucket under the offers key “directory.”*

The “*” glob at the end of the path means we’ll be reading in the files in each hour ‘directory’, each of which contains over 30,000 files. In
total, there are 24 “directories” for a total of over 700,000 files and 72M rows, each file of which is ~30KB.

But… there’s a lot going on under the hood and it’s not good [for performance reasons].

1. S3 is not a file system
2. Meta data on tiny files
3. GZip compressed files
4. JSON

1. S3 is not a file system

Amazon’s Simple Storage Service (S3) is a “cloud-based object storage solution” where each ‘object’ is identified by a bucket and a key.
When you list a directory on your local computer (e.g. “ls /tmp” on *nix systems), the information returned about the directory and files is
returned immediately. Asking S3 to list all the files in a “directory” (hint: it’s not actually a directory), such as through “s3cmd ls s3://bucket
/path/to/files” returns results in at best seconds, possibly minutes. S3 is optimal when you have few large files but horrendous when you
have an army of tiny files, because more files means the listing process takes substantially longer.

2. Meta data on tiny files

Spark has to know exact path and how to open each and every file (e.g. s3://bucket/path/to/objects/object1.gz) even if you just pass a path
to a “directory” because ultimately that “directory” is just a collection of files (or “objects”). With an army of tiny files, this meta data gets
large, both in number of elements but also in terms of memory (100,000 records in a hashmap to denote location, compression type and
other meta data) is not lightweight. Add in the overhead of using S3 plus network latencies and it becomes more clear why this meta data
collection process takes a long time.

3. GZip Compressed Files

GZip compression is great. Sure there are other compression formats out there (e.g. brotli, lzo, bzip2, etc), few if any are as widespread and
accessible as GZip. But what GZip has in terms of market share, it sacrifices in more big data friendly features. GZip is not splittable, which
means the entire file must be processed by a single core/executor as a single partition. This is an anti-pattern in a tool like spark designed
explicitly around parallelism, because having to serially process a file is expensive. Small GZip files are actually better to process than larger
ones, because multiple cores can work on different small files at the same time and not sit idle as one or two cores do all the work. But
don’t be tricked into thinking “oh I’ll just have loads of small files” because as you saw above, loads of small files are far worse than just
about any alternative.

4. JSON

It’s slow; plain and simple. It allows for complex structures and data types as well as implicitly defined types, which all make it very
expensive to parse.

* Directory on S3 is a misnomer, because S3 is an object store where each object is a combination of a bucket (e.g. “awesome-bucket”) with
a specific key (e.g. “/path/to/objects/object1.gz”) that has the object’s contents as its value. Think of it like a key-value store where ”
s3://awesome-bucket/offers/2017/09/07/12/2017-09-07-12-59-0.9429827.gz” is the KEY and the gzipped contents of
that file are the value. The slashes after the bucket name (awesome-bucket) mean nothing to S3; they exist solely for the user’s
convenience. Treating them as if they denote true directories is a convenience feature Amazon and APIs offer.

What can we do about it?
Use file formats like Apache Parquet and ORC. If you need to work with (ideally converting in full) armies of small files, there are some
approaches you can use.
1) S3DistCP (Qubole calls it CloudDistCP)
2) Use scala with spark to take advantage of Scala and Spark’s unique parallel job submission. See below for an example.
3) Just wait. A long time.

Parallel Job Submission to Consolidate many “directories”

val hours = (0 to 23).map(h => "%02d".format(h)) // zero pad
hours.par.foreach(hour => {
spark.read.json("s3a://awesome-bucket/offers/2017/09/07/" + hour.toString +
"/*").write.repartition(16).parquet("s3://output")
})

If the code and explanation doesn’t have you convinced, see this chart which shows the outsized performance boost by using Parquet over
armies of tiny files. This chart is only looking at one hour’s worth of data and on local HDFS instead of S3, so it actually makes the small files
look better than they should. I expect a minimum of a 20x speedup by using optimal file formats.

For the exceptionally interested parties, here is the chunk of spark code that I believe handles collecting the meta data on input files:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L380 – note how it’s not parallelized, asynchronous or otherwise optimized for peak performance involving tiny files.

Spark File Format Showdown – CSV vs JSON vs Parquet

Apache Spark supports many different data sources, such as the ubiquitous Comma Separated Value (CSV) format and web API friendly JavaScript Object Notation (JSON) format. A common format used primarily for big data analytical purposes is Apache Parquet. Parquet is a fast columnar data format that you can read more about in two of my other posts: Real Time Big Data analytics: Parquet (and Spark) + bonus and Tips for using Apache Parquet with Spark 2.x

In this post we’re going to cover the attributes of using these 3 formats (CSV, JSON and Parquet) with Apache Spark.

Splittable (definition): Spark likes to split 1 single input file into multiple chunks (partitions to be precise) so that it [Spark] can work on many partitions at one time (re: concurrently).

* CSV is splittable when it is a raw, uncompressed file or using a splittable compression format such as BZIP2 or LZO (note: LZO needs to be indexed to be splittable!)

** JSON has the same conditions about splittability when compressed as CSV with one extra difference. When “wholeFile” option is set to true (re: SPARK-18352), JSON is NOT splittable.

CSV should generally be the fastest to write, JSON the easiest for a human to understand and Parquet the fastest to read.

CSV is the defacto standard of a lot of data and for fair reasons; it’s [relatively] easy to comprehend for both users and computers and made more accessible via Microsoft Excel.

JSON is the standard for communicating on the web. APIs and websites are constantly communicating using JSON because of its usability properties such as well-defined schemas.

Parquet is optimized for the Write Once Read Many (WORM) paradigm. It’s slow to write, but incredibly fast to read, especially when you’re only accessing a subset of the total columns. For use cases requiring operating on entire rows of data, a format like CSV, JSON or even AVRO should be used.

Code examples and explanations

CSV

Generic column names | all string types | lazily evaluated

scala> val df = spark.read.option("sep", "\t").csv("data.csv")
scala> df.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

Header-defined column names | all string types | lazily evaluated

scala> val df = spark.read.option("sep", "\t").option("header","true").csv("data.csv")
scala> df.printSchema
root
 |-- guid: string (nullable = true)
 |-- date: string (nullable = true)
 |-- alphanum: string (nullable = true)
 |-- name: string (nullable = true)

Header-defined column names | inferred types | EAGERLY evaluated (!!!)

scala> val df = spark.read.option("sep", "\t").option("header","true").option("inferSchema","true").csv("data.csv")
scala> df.printSchema
root
 |-- guid: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- alphanum: string (nullable = true)
 |-- name: string (nullable = true)

The eager evaluation of this version is critical to understand. In order to determine with certainty the proper data types to assign to each column, Spark has to READ AND PARSE THE ENTIRE DATASET. This can be a very high cost, especially when the number of files/rows/columns is large. It also does no processing while it’s inferring the schema, so if you thought it would be running your actual transformation code while it’s inferring the schema, sorry, it won’t. Spark has to therefore read your file(s) TWICE instead of ONCE.

JSON

Named columns | inferred types | EAGERLY evaluated
scala> val df = spark.read.json("data.json")
scala> df.printSchema
root
 |-- alphanum: string (nullable = true)
 |-- epoch_date: long (nullable = true)
 |-- guid: string (nullable = true)
 |-- name: string (nullable = true)

Like the eagerly evaluated (for schema inferencing) CSV above, JSON files are eagerly evaluated.

Parquet

Named Columns | Defined types | lazily evaluated
scala> val df = spark.read.parquet("data.parquet")
scala> df.printSchema
root
 |-- alphanum: string (nullable = true)
 |-- date: long (nullable = true)
 |-- guid: string (nullable = true)
 |-- name: string (nullable = true)

Unlike CSV and JSON, Parquet files are binary files that contain meta data about their contents, so without needing to read/parse the content of the file(s), Spark can just rely on the header/meta data inherent to Parquet to determine column names and data types.

 TL;DR Use Apache Parquet instead of CSV or JSON whenever possible, because it’s faster and better.