Connecting Apache Spark to External Data sources (e.g. Redshift, S3, MySQL)



Hadoop AWS Jar

AWS Java SDK Jar

* Note: These AWS jars should not be necessary if you’re using Amazon EMR.

Amazon Redshift

JDBC Driver

Spark-Redshift package *

* The Spark-redshift package provided by Databricks is critical particularly if you wish to WRITE to Redshift, because it does bulk file operations instead of individual insert statements. If you’re only looking to READ from Redshift, this package may not be quite as helpful.


MySQL JDBC Connector jar

Setting your password [relatively securely]

This is not extremely secure, but is much better than putting your password directly into code.

Use a properties file:

<code>echo "spark.jdbc.password=test_pass_prop" &gt;
spark-submit --properties-file</code>

Examples (in Scala unless otherwise noted)

S3 (using S3A)

<code class="bash plain">spark-shell --jars hadoop-aws-2.7.3.jar,aws-java-sdk-1.7.4.jar</code>
<code>spark.conf.set("fs.s3a.access.key", "&lt;ACCESS_KEY&gt;")
spark.conf.set("fs.s3a.secret.key", "&lt;SECRET_KEY&gt;")
val d ="s3a://parquet-lab/files")"device_id").distinct().count() // =&gt; 1337</code>

* On Amazon EMR, you may be able to skip the jars and key settings.
** Also, you may also want to try using the “s3” or “s3n” protocols if s3a doesn’t work.


<code>spark-shell --jars mysql-connector-java-5.1.40-bin.jar
val properties = new java.util.Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
properties.put("url", "jdbc:mysql://mysql-host:3306")
properties.put("user", )
properties.put("password", spark.conf.get("spark.jdbc.password", "&lt;default_pass&gt;"))
// This will form a SQL query like "SELECT model_id, prediction, actual_value FROM ml_models WHERE utc_start_time BETWEEN '2017-03-31' AND '2017-04-02'"
// Using .limit(INT) will NOT work as you might expect - it will retrieve all the data first THEN limit when showing you
val models ="url").toString, "ml_models", Array("utc_start_time BETWEEN '2017-03-31' AND '2017-04-02'"), properties).select("model_id", "prediction", "actual_value")</code>


Recommended approach using Databricks’ spark-redshift:

spark-shell --packages com.databricks:spark-redshift_2.11:3.0.0-preview1 --jars RedshiftJDBC42-

Basic JDBC connection only:

spark-shell --jars RedshiftJDBC42-
val properties = new java.util.Properties() 
properties.put("driver", "") 
properties.put("url", "jdbc:redshift://redshift-host:5439/") 
properties.put("user", "&lt;username&gt;") properties.put("password",spark.conf.get("spark.jdbc.password", "&lt;default_pass&gt;")) 
val d_rs ="url").toString, "data_table", properties)</code>

Using the Databricks Redshift data source package – for Bulk Data WRITING to Redshift, use this package:

Reading from and writing to Redshift stages data [and doesn’t clean up after itself] in S3, so use object lifecycle management!

val devices ="com.databricks.spark.redshift").
option("forward_spark_s3_credentials", "true").
option("url", "jdbc:redshift://redshift-host:5439/?user=&lt;user&gt;&amp;password=&lt;password&gt;").
option("query", "SELECT * FROM devices").
option("tempdir", "s3://temporary-holding-bucket/").load()

Writing the dataframe to Redshift in the “public.temporary_devices” table:

devices_transformed.coalesce(64).write .format("com.databricks.spark.redshift") .option("forward_spark_s3_credentials", "true") .option("url", "jdbc:redshift://redshift-host:5439/?user=&amp;password=") .option("dbtable", "public.temporary_devices") .option("tempdir", "s3a://temporary-holding-bucket/") .option("tempformat", "CSV GZIP") // EXPERIMENTAL, but CSV is higher performance than AVRO for loading into redshift .mode("error") .save()

* Note: coalesce(64) is called to reduce the number of output files to the s3 staging directory, because renaming files from their temporary location in S3 can be slow. This S3Committer should help alleviate that issue.


Tips for using Apache Parquet with Spark 2.x

What is Apache Parquet?
It is a compressable binary columnar data format used in the hadoop ecosystem. We’ll talk about it primarily with relation to the Hadoop Distributed File System (HDFS) and Spark 2.x contexts.

What role does it fill?
It is a fast and efficient data format great for scalable big data analytics.

Optimization Tips

  • Aim for around 1GB parquet output files, but experiment with other sizes for your use case and cluster setup (source)
  • Ideally store on HDFS in file sizes of at least the HDFS block size (default 128MB)
  • Storing Parquet files on S3 is also possible (side note: use amazon athena, which charges based on data read if you want Presto SQL-like queries on demand at low cost)
  • Use snappy compression if storage space is not a concern due to it being splittable, but for what should be a relatively small performance hit but much better compression, use gzip (source)
  • Continue reading “Tips for using Apache Parquet with Spark 2.x”