Split STDIN to multiple Compressed Files on highly skewed data

Sometimes I have data that is simply too large to store on one volume uncompressed, but I need to run processes on it. In my recent use case, I had one large Tar GZIPPED file that decompressed into many smaller tar gzipped files, which then decompressed into two column text (both strings). I did not have the luxury of having enough space available to both decompress and process the data to a smaller size. While I could have processed the files as they were (gzipped), the performance would have been sub-standard for these reasons:

    1) GZIP on the fly decompression in Python is noticeably slower than reading uncompressed data.
      2) As I was going to run a process on over 1 Billion records, I was going to kick it off in concurrent threads using xargs

simple parallelized processing.

    Unfortunately there was a significant skew to the data where a handful of hundreds of files made up 90%+ of the data. So one thread would process a file with 800,000 lines then process another one on its own with 400,000,000 lines while other threads were unused because all the small files were quickly processed.

So what I wanted was a way to concatenate all the compressed files files, then split those into roughly equivalently sized *compressed* files. Initially I tried

time for i in $(ls /tmp/files/skewed_file*.gz); do zcat $i | split -l 1000000 -d -a 5 - /tmp/files/$(basename $i ".gz")_ && ls /tmp/files/$(basename $i ".gz")_* | grep -v ".out" | xargs -P10 -n1 python etl_file.py; done

What that stream of commands would do is iterate over each skewed_file (hundreds, ranging in size from 8KB to 1GB+) in the /tmp/files/ directory, then zcat (gzip concat), split the STDIN by using the hyphen (“-“) instead of a file name to represent STDIN, output it to the same named file without .gz extension and an underscore for numbering (e.g. skewed_file_a.gz becomes skewed_file_a_00000 and skewed_file_a_00001), then runs a python script to handle ETL using simple parallelized processing.

Now with that long of a command, it makes you wonder whether there are any faster/simpler ways to do a similar thing. There are!

zcat *.gz | parallel -l2 --pipe --block 50m gzip ">"all_{#}.gz

With this one [relatively simple] line, my large group of similarly formatted, but skewed files are split into blocks and compressed using gzip and redirected to properly named files. Booya!

Relevant Stack Overflow question/answer: http://stackoverflow.com/questions/22628610/split-stdin-to-multiple-files-and-compress-them-if-possible

EDIT:

It is also possible to use parallel to split by number of lines using the -N flag:

zcat *.gz | parallel --pipe -N1000000 gzip ">"all_{#}

Simple parallelized processing with GIL languages (Python, Ruby, etc)

Sometimes we want parallel processing, but don’t want to pay the cost of ensuring proper multi-threaded handling. Because who wants to spend an extra 30 minutes setting up threads, ensuring race condition safety, et al just to save a few minutes?

If you have access to “xargs,” you have access to a built-in utility that can simplify parallel processing. Say you have a few hundred files you want to convert from json to csv, and it’s enough data to warrant parallel processes, but not enough to warrant the extra time spent building a truly parallel process. Do a simple “ls”, pipe it to “xargs -P8 -n1 python my_conversion.py” and now you’ll have 8 separate processes working on 1 file each independently.

Command:

ls file.json_* | xargs -P8 -n1 python my_conversion.py

xargs explained:
-P8 means we want 8 separate processes (so no Global Interpreter Lock issues)
-n1 means we want xargs to take the arguments and pass only one (file name) at a time as an argument to the python code

ELI5 – Jelly Bean Analogy to MapReduce (Hadoop)

A simple and tasty explanation of the MapReduce process:

Start with a bowl of 4 colored Jelly Beans (Red, Green, Blue, and Yellow). You don’t know exactly how many JBs are in the bowl, nor do you know how many of each JBs are in the bowl. But naturally you want to know. Because why would you not want to know? 🙂

MapReduce would process it in a similar way to this:

  • Split JBs into smaller piles of an approximate weight (1kg each pile for 4 total piles)
  • You and 3 friends work together to separate your given pile into 4 separate piles for each color (this is the mapping phase)
  • Once the four of you are done separating your piles, you now each are assigned to counting a different color, so now the piles are shuffled around on the table until each person has only the color pile they are assigned to work on (this is the sorting/shuffling phase)
  • Now that each person has their pile, you each count your individual piles (this is the reducing phase)
  • At the end of the reduce phase, you have 4 piles, each counted separately. You can now chose to reduce further and sum up all four counts to get the total count.
  • You started with a bunch of unordered, unknown quantity of jelly beans, but now they are ordered and counted!

    Now time to celebrate by eating your hard work. Num num num

    Pseudo-Normalized Database Engine Concept

    Currently in a Relational Database such as MySQL, Oracle, SQL Server, etc, the two most common schools of thought are Normalized vs Denormalized database designs.

    Essentially, Normalized Database design entails grouping similar dimensions into a single table, such as the ephemeral orders, customers, and products tables. Normalized design might have an orders table with order_id, order_date, customer_id, and product_id then a customers table identified by customer_id and products table identified by product_id. Denormalized designs mean the data is combined into one table. So in this case, it might be an orders table with products and customers listed in the same table.

    Normalized designs are better on storage space, but require join statements. Denormlized statements put everything together, but require more complex insertions/updates.

    What I propose is an alternate database storage engine concept that would automatically build a memory-mapped normalization under the hood. Picture a denormalized orders table with order_id, customer name, new_customer flag, and product name. Let’s say order_id is high cardinality (very uncommon/unique), customer name is normal cardinality (fairly uncommon/unique), with product name and new_customer flag are low cardinality (repeated frequently).

    Example Table:
    Order ID: 5
    Customer Name: Jerry
    Product Name: Box of Cookies
    New Customer: True

    A pseudo-normalized table might abstract the product name into a memory map, leaving an integer in the product name’s strings place under the hood.

    Example Table:
    Order ID: 5
    Customer Name: Jerry
    Product Name: 1
    New Customer: True

    Example Memory Map:
    Product Id: 1
    Product Name: Box of Cookies

    Now when the data is being retrieved, the engine would look in the source table for the raw data, then map the integer for product name to the actual product name string.

    By mapping low cardinality columns to high performance in-memory maps, we are able to drastically reduce the amount of storage space used and potentially increase performance simultaneously.

    Note this design would involve only changing the underlying structure of the data, so anytime the data is accessed, it would go through a translation layer to convert the mapped value to the actually defined value.

    Github repo with Proof of Concept: https://github.com/gstaubli/pndb

    Apache Pig chokes on many small files

    I had the displeasure of using multiple versions of Apache Pig (0.9, 0.11, 0.12, 0.13 and 0.14) in different capacities. Why was it so unpleasant you ask?

    My scripts were running quickly and efficiently on Pig 0.9.2. I was using globs in my LOAD statement (e.g. “a = LOAD ‘/files/*/*type_v4*.lzo”) to find tens to hundreds of thousands of files in an HDFS structure as follows:
    files/
    — 2014-10-01/
    —- 2014-10-01-15-45-00.log_type_v4.log.lzo

    The log types could vary by day but would only have a delta on 1 each day with most days maintaining the same version number. So on 2014-10-02 there might be log types v4 and v5, but for the rest of October it would be only log type 5.

    The log types were effectively cumulative; a new log version meant the new log version had the same fields as the old version plus some. Unfortunately… new fields were sometimes added to the MIDDLE of new log version, so log version 4 would have 2 columns name and phone in that order, but log version 4 would have name, email and phone in that order. This meant new parsers were needed for every log version.

    However, while the versions would change, my aggregation desires would use the same fields that existed in all logs and I would want to look back through many log versions in my analysis.

    So we’re back to the real issue at hand. Once I got past the individual parsers for each log version and ran my scripts frequently on Pig 0.9.2, I started to run the scripts on other clusters, but I immediately noticed a huge problem. The LOAD statements would not finish and even if they did, the job would almost always if not always fail. I ran through the verbose output, tried figuring out why it was hanging on all versions newer than 0.9.2, but never came up with a solution. I need to get in touch with Pig’s maintainers about this one… Needless to say, I now use Pig 0.9.2 begrudgingly and exclusively because the LOAD statements could not seem to handle the massive number of small files I was passing to it past version 0.9.2.

    Split metadata size exceeded 10000000

    java.io.IOException: Split metadata size exceeded 10000000. was the error I got when trying to process ~20TB of highly compressed logs (~100TB uncompressed) on my 64 node Amazon EMR cluster. Naturally I found some good resources recommending a quick file by modifying the mapred-site.xml file in /home/hadoop/conf/

    Warning: By setting this configuration to -1, you are effectively removing the limit on the splits’ metadata. Be aware this may cause unintended consequences should your cluster not have the resources to handle the actual job.

    <!-- In: conf/mapred-site.xml -->
    <property>
    <name>mapreduce.jobtracker.split.metainfo.maxsize</name>
        <value>-1</value>
    </property>

    Then since I was modifying an existing running EMR cluster on Hadoop 1.0.4, I needed to restart my jobtracker:
    sudo service --status-all
    sudo service hadoop-jobtracker restart

    Restart Yarn ResourceManager for running EMR cluster on Hadoop 2.4.0:
    sudo service yarn-resourcemanager stop

    Source post: http://blog.dongjinleekr.com/my-hadoop-job-crashes-with-split-metadata-size-exceeded/

    Now using S4CMD

    S3CMD’s distinct lack of multi-threading led me to hunt for alternatives. While I tried many alternatives, such as s3-multipart (great when I did use it), s3funnel and s3cp among others, none quite fit the bill of supporting the key features I found important.

    1) Listing/Downloading/Uploading/etc of files and “folders”
    2) Multi-threaded
    3) Synchronization handled so as to avoid re-downloading an existing file

    S4CMD fit all the requirements and did it at an even higher performance than I anticipated. It did not require me to set the number of threads I wanted it to use (for better or for worse), and it seemed to err on the side of more threads (e.g. 32 threads for a single process). This even considering Python Global Interpreter Lock (GIL). The downloading performance was superb. It was saturating a good portion of the available bandwidth (~30-50mbps – ~4-6MB/s). If it saturated the bandwidth anymore, my coworkers may not have appreciated the return to dialup :).

    What makes S4CMD so awesome to me is the fact that it takes multi-threading very seriously, using it for more than just GET requests, but also even listing of files/directories. The multi-threading was a real pain point for me because I use logs from a third party that are split into 15 minute chunks and there are anywhere from 1 to numerous files per 15 minute chunk. For better or worse, each file is miniscule in size. Unfortunately this makes downloading the files a nightmare as the overhead to GET the file is excessive for any serial downloaders. S4CMD’s multi-threading makes downloading the files a breeze.

    I am in no way affiliated with S4CMD besides having a code crush on it :).