Converting CSVs with Headers to AVRO

Recently I wanted to quickly convert some CSV files to AVRO due to recent logging changes that meant we were receiving AVRO logs instead of CSV. I wanted to have some AVRO files to test my logic on and to get more familiar with AVRO. After looking around for a while and trying a few different CSV to AVRO conversion utilities, none of them actually worked as expected. I just wanted a simple conversion, why was this so hard to find? The closest I came to finding a working CSV2AVRO utility was avroutils.

Unfortunately avroutils hasn’t been updated in 5 years and the code was very restrictive since it offered a command line argument for “headers.” However regardless of whether that argument was passed, it used the [in my opinion, flawed] utility “csv.Sniffer” in Python to confirm there was a header. For me, this meant even though I was explicitly telling it my file had a header it still did not convert due to the Sniffer claiming there was no header.

To reiterate, I really just wanted a simple utility to convert CSV to AVRO if for no other reason than to some hands-on experience with the AVRO format. Why was I having such a hard time?

So… I wrote my own [very basic] utility to handle CSV to AVRO conversions. It does no validation (re: it assumes you know what you’re doing) and is hard-coded to treat all fields as strings. Remember, this was primarily a learning exercise!

Without further ado, I present to you csv2avro

Questions/Comments/Bugs/Requests/etc are encouraged!

Metadata for Functions | Python Decorator

Often I find myself wanting more information about the Python functions I’m running, whether it’s because I want to debug, log or even time their completion. All of these are relatively well-defined problems (debugging excepted). Unfortunately no tool makes it easy enough from my research to truly see the input, output, time elapsed, errors, warnings, etc about a function in a simple interface. So I wrote a simple python decorator compatible with Python 2.7+ (and probably earlier versions), including Py3.

What does the meta_func decorator actually do?

It stores all arguments for every function call, both positional and keyword arguments, error information (including the ability to catch and not raise errors), warnings, time elements (time started, ended and elapsed), and the returned value as a standard python dictionary.

What’s the point of tracking all this [meta]data?

Debugging, Logging, Timing… The use cases are nearly endless, because it tells us a lot of what’s going on in one easily interpreted structure.

Important Notes

This decorator should be expected to add a good deal of overhead to many function calls due to the handling of so many dimensions.

Arguments (Positional and Keywords), Return value, Warnings and Exceptions will be stored in their raw form, so any transformations (such as stringifying errors and traceback) would need to be done post-processing.

The error_info field will return a tuple from sys.exc_info() with error details.

Github Repo

https://github.com/gstaubli/meta_func

The Power of Hadoop in under 10 lines!

Okay Okay, I may have oversold it a bit, but here are less than 10 bash lines that resemble (if you squint really hard) Hadoop/MapReduce.

code_to_run=$1
in_file=$2
out_file=$3
split -d -a 5 -l 100000 $in_file $in_file"_" && \
ls $in_file"_"* | xargs -P8 -n1 -I file $code_to_run file file.out && \
cat $in_file"_"*.out > $out_file && \
rm $in_file"_"*

What will this do?
Takes 3 args

  • code_to_run is just a path to an executable
  • in_file is a path to a single in_file
  • out_file is a path to a single out_file
split -d -a 5 -l 100000 $in_file $in_file"_"

Split the in_file into 100,000 line chunks with an underscore and numbers following (e.g. in_file = “file.tsv”, temp files file.tsv_00000, file.tsv_00001, etc)

ls $in_file"_"* | xargs -P8 -n1 -I file $code_to_run file file.out

Get a list of all temp numbered in files, pass into xargs to run 8 processes of your code_to_run executable passing in the chunked in_file and outputting a chunked out_file.

cat $in_file"_"*.out > $out_file

Then cat chunked out files into single out file as you expect

rm $in_file"_"*

Cleanup (re: remove) all temporary files; both in and out temporary files will be removed.

For the sake of data safety, we include “&&” following each line to ensure all subsequent commands are not run unless the prior conditions are met.

ELI5 – The Human analogy to Multi-threading (Concurrency) and how it can go wrong

Ever feel like your brain is processing multiple things at once? An example might be when you’re reading a book and listening to music. While you may not realize it, your brain is processing both what you’re reading and what you’re hearing simultaneously.

A computer operates similarly; it is constantly working in the background doing multiple tasks (such as drawing the windows on your screen, checking for new e-mails, etc) all while juggling its specific user-requested tasks (such as opening a document then writing in it). The computer is doing these things concurrently just as your brain was reading a book and listening to music concurrently. It’s not sacrificing one task to complete another one; it is working on the tasks simultaneously without interruption.* This stands in contrast to processing information serially; you read a book, then you listen to music.

When we talk to another person, we expect them to understand what we’re saying because we formulate a thought, then compose our thoughts into words and finally speak those words. Likewise, a computer program generally tries to ensure the user understands what it means by handling background tasks out of sight (no windows, no warnings, etc) and only notifying the user when it is instructed to do so.

We speak with one mouth and are therefore locked to saying one thing at any given moment. That doesn’t mean we don’t make mistakes in our speech sometimes, such as by responding to the question “What would you like to eat?” with “Yes, I’d like a cheese refrigerator pizza” when we meant to say “Yes, I’d like a cheese pizza” while thinking concurrently about how we still need to fix our refrigerator. A computer can make a similar mistake when it’s instructed to tell the user “Your system is experiencing a disk space error” and “FREE VACATION – CALL NOW” simultaneously. Those two messages are unrelated and undesirable to be displayed together, let alone erroneously intertwined.

Unfortunately the bad news is that both humans and computers will likely continue to make the same mistakes for the foreseeable future. The solutions that exist for both are flawed; humans must focus extensively [not to mention consciously] to ensure only the words we wish to speak are spoken. Computers likewise must be instructed by the former (re: humans) to only output the information for which the user is meant to be provided.

The good news is that multi-threading/concurrency allows information to be processed simultaneously and therefore faster than when each action blocks subsequent actions from being processed.

Apache Pig chokes on many small files [part 2]

As a followup to my initial post regarding Apache Pig’s inability to quickly load many small files in Pig 0.10 and newer, I wanted to share a simple fix that worked for me courtesy of in-depth research by Amazon Support Team (+ Engineers).

Basically around Pig 0.10.0, PigStorage builds a hidden schema file in an attempt to determine your file’s schema. By passing the ‘-noschema’ flag to PigStorage, we see far improved performance.

a = LOAD '/files/*' USING PigStorage('\t','-noschema') AS (field1:int, field2:chararray);

Much better.

Split STDIN to multiple Compressed Files on highly skewed data

Sometimes I have data that is simply too large to store on one volume uncompressed, but I need to run processes on it. In my recent use case, I had one large Tar GZIPPED file that decompressed into many smaller tar gzipped files, which then decompressed into two column text (both strings). I did not have the luxury of having enough space available to both decompress and process the data to a smaller size. While I could have processed the files as they were (gzipped), the performance would have been sub-standard for these reasons:

    1) GZIP on the fly decompression in Python is noticeably slower than reading uncompressed data.
      2) As I was going to run a process on over 1 Billion records, I was going to kick it off in concurrent threads using xargs

simple parallelized processing.

    Unfortunately there was a significant skew to the data where a handful of hundreds of files made up 90%+ of the data. So one thread would process a file with 800,000 lines then process another one on its own with 400,000,000 lines while other threads were unused because all the small files were quickly processed.

So what I wanted was a way to concatenate all the compressed files files, then split those into roughly equivalently sized *compressed* files. Initially I tried

time for i in $(ls /tmp/files/skewed_file*.gz); do zcat $i | split -l 1000000 -d -a 5 - /tmp/files/$(basename $i ".gz")_ && ls /tmp/files/$(basename $i ".gz")_* | grep -v ".out" | xargs -P10 -n1 python etl_file.py; done

What that stream of commands would do is iterate over each skewed_file (hundreds, ranging in size from 8KB to 1GB+) in the /tmp/files/ directory, then zcat (gzip concat), split the STDIN by using the hyphen (“-“) instead of a file name to represent STDIN, output it to the same named file without .gz extension and an underscore for numbering (e.g. skewed_file_a.gz becomes skewed_file_a_00000 and skewed_file_a_00001), then runs a python script to handle ETL using simple parallelized processing.

Now with that long of a command, it makes you wonder whether there are any faster/simpler ways to do a similar thing. There are!

zcat *.gz | parallel -l2 --pipe --block 50m gzip ">"all_{#}.gz

With this one [relatively simple] line, my large group of similarly formatted, but skewed files are split into blocks and compressed using gzip and redirected to properly named files. Booya!

Relevant Stack Overflow question/answer: http://stackoverflow.com/questions/22628610/split-stdin-to-multiple-files-and-compress-them-if-possible

EDIT:

It is also possible to use parallel to split by number of lines using the -N flag:

zcat *.gz | parallel --pipe -N1000000 gzip ">"all_{#}

Simple parallelized processing with GIL languages (Python, Ruby, etc)

Sometimes we want parallel processing, but don’t want to pay the cost of ensuring proper multi-threaded handling. Because who wants to spend an extra 30 minutes setting up threads, ensuring race condition safety, et al just to save a few minutes?

If you have access to “xargs,” you have access to a built-in utility that can simplify parallel processing. Say you have a few hundred files you want to convert from json to csv, and it’s enough data to warrant parallel processes, but not enough to warrant the extra time spent building a truly parallel process. Do a simple “ls”, pipe it to “xargs -P8 -n1 python my_conversion.py” and now you’ll have 8 separate processes working on 1 file each independently.

Command:

ls file.json_* | xargs -P8 -n1 python my_conversion.py

xargs explained:
-P8 means we want 8 separate processes (so no Global Interpreter Lock issues)
-n1 means we want xargs to take the arguments and pass only one (file name) at a time as an argument to the python code

ELI5 – Jelly Bean Analogy to MapReduce (Hadoop)

A simple and tasty explanation of the MapReduce process:

Start with a bowl of 4 colored Jelly Beans (Red, Green, Blue, and Yellow). You don’t know exactly how many JBs are in the bowl, nor do you know how many of each JBs are in the bowl. But naturally you want to know. Because why would you not want to know? 🙂

MapReduce would process it in a similar way to this:

  • Split JBs into smaller piles of an approximate weight (1kg each pile for 4 total piles)
  • You and 3 friends work together to separate your given pile into 4 separate piles for each color (this is the mapping phase)
  • Once the four of you are done separating your piles, you now each are assigned to counting a different color, so now the piles are shuffled around on the table until each person has only the color pile they are assigned to work on (this is the sorting/shuffling phase)
  • Now that each person has their pile, you each count your individual piles (this is the reducing phase)
  • At the end of the reduce phase, you have 4 piles, each counted separately. You can now chose to reduce further and sum up all four counts to get the total count.
  • You started with a bunch of unordered, unknown quantity of jelly beans, but now they are ordered and counted!

    Now time to celebrate by eating your hard work. Num num num

    Pseudo-Normalized Database Engine Concept

    Currently in a Relational Database such as MySQL, Oracle, SQL Server, etc, the two most common schools of thought are Normalized vs Denormalized database designs.

    Essentially, Normalized Database design entails grouping similar dimensions into a single table, such as the ephemeral orders, customers, and products tables. Normalized design might have an orders table with order_id, order_date, customer_id, and product_id then a customers table identified by customer_id and products table identified by product_id. Denormalized designs mean the data is combined into one table. So in this case, it might be an orders table with products and customers listed in the same table.

    Normalized designs are better on storage space, but require join statements. Denormlized statements put everything together, but require more complex insertions/updates.

    What I propose is an alternate database storage engine concept that would automatically build a memory-mapped normalization under the hood. Picture a denormalized orders table with order_id, customer name, new_customer flag, and product name. Let’s say order_id is high cardinality (very uncommon/unique), customer name is normal cardinality (fairly uncommon/unique), with product name and new_customer flag are low cardinality (repeated frequently).

    Example Table:
    Order ID: 5
    Customer Name: Jerry
    Product Name: Box of Cookies
    New Customer: True

    A pseudo-normalized table might abstract the product name into a memory map, leaving an integer in the product name’s strings place under the hood.

    Example Table:
    Order ID: 5
    Customer Name: Jerry
    Product Name: 1
    New Customer: True

    Example Memory Map:
    Product Id: 1
    Product Name: Box of Cookies

    Now when the data is being retrieved, the engine would look in the source table for the raw data, then map the integer for product name to the actual product name string.

    By mapping low cardinality columns to high performance in-memory maps, we are able to drastically reduce the amount of storage space used and potentially increase performance simultaneously.

    Note this design would involve only changing the underlying structure of the data, so anytime the data is accessed, it would go through a translation layer to convert the mapped value to the actually defined value.

    Github repo with Proof of Concept: https://github.com/gstaubli/pndb

    Apache Pig chokes on many small files

    I had the displeasure of using multiple versions of Apache Pig (0.9, 0.11, 0.12, 0.13 and 0.14) in different capacities. Why was it so unpleasant you ask?

    My scripts were running quickly and efficiently on Pig 0.9.2. I was using globs in my LOAD statement (e.g. “a = LOAD ‘/files/*/*type_v4*.lzo”) to find tens to hundreds of thousands of files in an HDFS structure as follows:
    files/
    — 2014-10-01/
    —- 2014-10-01-15-45-00.log_type_v4.log.lzo

    The log types could vary by day but would only have a delta on 1 each day with most days maintaining the same version number. So on 2014-10-02 there might be log types v4 and v5, but for the rest of October it would be only log type 5.

    The log types were effectively cumulative; a new log version meant the new log version had the same fields as the old version plus some. Unfortunately… new fields were sometimes added to the MIDDLE of new log version, so log version 4 would have 2 columns name and phone in that order, but log version 4 would have name, email and phone in that order. This meant new parsers were needed for every log version.

    However, while the versions would change, my aggregation desires would use the same fields that existed in all logs and I would want to look back through many log versions in my analysis.

    So we’re back to the real issue at hand. Once I got past the individual parsers for each log version and ran my scripts frequently on Pig 0.9.2, I started to run the scripts on other clusters, but I immediately noticed a huge problem. The LOAD statements would not finish and even if they did, the job would almost always if not always fail. I ran through the verbose output, tried figuring out why it was hanging on all versions newer than 0.9.2, but never came up with a solution. I need to get in touch with Pig’s maintainers about this one… Needless to say, I now use Pig 0.9.2 begrudgingly and exclusively because the LOAD statements could not seem to handle the massive number of small files I was passing to it past version 0.9.2.