Split STDIN to multiple Compressed Files on highly skewed data

Sometimes I have data that is simply too large to store on one volume uncompressed, but I need to run processes on it. In my recent use case, I had one large Tar GZIPPED file that decompressed into many smaller tar gzipped files, which then decompressed into two column text (both strings). I did not have the luxury of having enough space available to both decompress and process the data to a smaller size. While I could have processed the files as they were (gzipped), the performance would have been sub-standard for these reasons:

    1) GZIP on the fly decompression in Python is noticeably slower than reading uncompressed data.
      2) As I was going to run a process on over 1 Billion records, I was going to kick it off in concurrent threads using xargs

simple parallelized processing.

    Unfortunately there was a significant skew to the data where a handful of hundreds of files made up 90%+ of the data. So one thread would process a file with 800,000 lines then process another one on its own with 400,000,000 lines while other threads were unused because all the small files were quickly processed.

So what I wanted was a way to concatenate all the compressed files files, then split those into roughly equivalently sized *compressed* files. Initially I tried

time for i in $(ls /tmp/files/skewed_file*.gz); do zcat $i | split -l 1000000 -d -a 5 - /tmp/files/$(basename $i ".gz")_ && ls /tmp/files/$(basename $i ".gz")_* | grep -v ".out" | xargs -P10 -n1 python etl_file.py; done

What that stream of commands would do is iterate over each skewed_file (hundreds, ranging in size from 8KB to 1GB+) in the /tmp/files/ directory, then zcat (gzip concat), split the STDIN by using the hyphen (“-“) instead of a file name to represent STDIN, output it to the same named file without .gz extension and an underscore for numbering (e.g. skewed_file_a.gz becomes skewed_file_a_00000 and skewed_file_a_00001), then runs a python script to handle ETL using simple parallelized processing.

Now with that long of a command, it makes you wonder whether there are any faster/simpler ways to do a similar thing. There are!

zcat *.gz | parallel -l2 --pipe --block 50m gzip ">"all_{#}.gz

With this one [relatively simple] line, my large group of similarly formatted, but skewed files are split into blocks and compressed using gzip and redirected to properly named files. Booya!

Relevant Stack Overflow question/answer: http://stackoverflow.com/questions/22628610/split-stdin-to-multiple-files-and-compress-them-if-possible


It is also possible to use parallel to split by number of lines using the -N flag:

zcat *.gz | parallel --pipe -N1000000 gzip ">"all_{#}

Simple parallelized processing with GIL languages (Python, Ruby, etc)

Sometimes we want parallel processing, but don’t want to pay the cost of ensuring proper multi-threaded handling. Because who wants to spend an extra 30 minutes setting up threads, ensuring race condition safety, et al just to save a few minutes?

If you have access to “xargs,” you have access to a built-in utility that can simplify parallel processing. Say you have a few hundred files you want to convert from json to csv, and it’s enough data to warrant parallel processes, but not enough to warrant the extra time spent building a truly parallel process. Do a simple “ls”, pipe it to “xargs -P8 -n1 python my_conversion.py” and now you’ll have 8 separate processes working on 1 file each independently.


ls file.json_* | xargs -P8 -n1 python my_conversion.py

xargs explained:
-P8 means we want 8 separate processes (so no Global Interpreter Lock issues)
-n1 means we want xargs to take the arguments and pass only one (file name) at a time as an argument to the python code

ELI5 – Jelly Bean Analogy to MapReduce (Hadoop)

A simple and tasty explanation of the MapReduce process:

Start with a bowl of 4 colored Jelly Beans (Red, Green, Blue, and Yellow). You don’t know exactly how many JBs are in the bowl, nor do you know how many of each JBs are in the bowl. But naturally you want to know. Because why would you not want to know? 🙂

MapReduce would process it in a similar way to this:

  • Split JBs into smaller piles of an approximate weight (1kg each pile for 4 total piles)
  • You and 3 friends work together to separate your given pile into 4 separate piles for each color (this is the mapping phase)
  • Once the four of you are done separating your piles, you now each are assigned to counting a different color, so now the piles are shuffled around on the table until each person has only the color pile they are assigned to work on (this is the sorting/shuffling phase)
  • Now that each person has their pile, you each count your individual piles (this is the reducing phase)
  • At the end of the reduce phase, you have 4 piles, each counted separately. You can now chose to reduce further and sum up all four counts to get the total count.
  • You started with a bunch of unordered, unknown quantity of jelly beans, but now they are ordered and counted!

    Now time to celebrate by eating your hard work. Num num num

    Pseudo-Normalized Database Engine Concept

    Currently in a Relational Database such as MySQL, Oracle, SQL Server, etc, the two most common schools of thought are Normalized vs Denormalized database designs.

    Essentially, Normalized Database design entails grouping similar dimensions into a single table, such as the ephemeral orders, customers, and products tables. Normalized design might have an orders table with order_id, order_date, customer_id, and product_id then a customers table identified by customer_id and products table identified by product_id. Denormalized designs mean the data is combined into one table. So in this case, it might be an orders table with products and customers listed in the same table.

    Normalized designs are better on storage space, but require join statements. Denormlized statements put everything together, but require more complex insertions/updates.

    What I propose is an alternate database storage engine concept that would automatically build a memory-mapped normalization under the hood. Picture a denormalized orders table with order_id, customer name, new_customer flag, and product name. Let’s say order_id is high cardinality (very uncommon/unique), customer name is normal cardinality (fairly uncommon/unique), with product name and new_customer flag are low cardinality (repeated frequently).

    Example Table:
    Order ID: 5
    Customer Name: Jerry
    Product Name: Box of Cookies
    New Customer: True

    A pseudo-normalized table might abstract the product name into a memory map, leaving an integer in the product name’s strings place under the hood.

    Example Table:
    Order ID: 5
    Customer Name: Jerry
    Product Name: 1
    New Customer: True

    Example Memory Map:
    Product Id: 1
    Product Name: Box of Cookies

    Now when the data is being retrieved, the engine would look in the source table for the raw data, then map the integer for product name to the actual product name string.

    By mapping low cardinality columns to high performance in-memory maps, we are able to drastically reduce the amount of storage space used and potentially increase performance simultaneously.

    Note this design would involve only changing the underlying structure of the data, so anytime the data is accessed, it would go through a translation layer to convert the mapped value to the actually defined value.

    Github repo with Proof of Concept: https://github.com/gstaubli/pndb