Pre-requisites
AWS S3
AWS Java SDK Jar
* Note: These AWS jars should not be necessary if you’re using Amazon EMR.
Amazon Redshift
* The Spark-redshift package provided by Databricks is critical particularly if you wish to WRITE to Redshift, because it does bulk file operations instead of individual insert statements. If you’re only looking to READ from Redshift, this package may not be quite as helpful.
MySQL
Setting your password [relatively securely]
This is not extremely secure, but is much better than putting your password directly into code.
Use a properties file:
echo "spark.jdbc.password=test_pass_prop" > secret_credentials.properties
spark-submit --properties-file secret_credentials.properties
Examples (in Scala unless otherwise noted)
S3 (using S3A)
spark-shell --jars hadoop-aws-2.7.3.jar,aws-java-sdk-1.7.4.jar
spark.conf.set("fs.s3a.access.key", "<ACCESS_KEY>") spark.conf.set("fs.s3a.secret.key", "<SECRET_KEY>") val d = spark.read.parquet("s3a://parquet-lab/files") d.select("device_id").distinct().count() // => 1337
* On Amazon EMR, you may be able to skip the jars and key settings.
** Also, you may also want to try using the “s3” or “s3n” protocols if s3a doesn’t work.
MySQL
spark-shell --jars mysql-connector-java-5.1.40-bin.jar
val properties = new java.util.Properties()
properties.put("driver", "com.mysql.jdbc.Driver")
properties.put("url", "jdbc:mysql://mysql-host:3306")
properties.put("user", )
properties.put("password", spark.conf.get("spark.jdbc.password", "<default_pass>"))
// This will form a SQL query like "SELECT model_id, prediction, actual_value FROM ml_models WHERE utc_start_time BETWEEN '2017-03-31' AND '2017-04-02'"
// Using .limit(INT) will NOT work as you might expect - it will retrieve all the data first THEN limit when showing you
val models = spark.read.jdbc(properties.get("url").toString, "ml_models", Array("utc_start_time BETWEEN '2017-03-31' AND '2017-04-02'"), properties).select("model_id", "prediction", "actual_value")
Redshift
Recommended approach using Databricks’ spark-redshift:
spark-shell --packages com.databricks:spark-redshift_2.11:3.0.0-preview1 --jars RedshiftJDBC42-1.2.1.1001.jar
Basic JDBC connection only:
spark-shell --jars RedshiftJDBC42-1.2.1.1001.jar
val properties = new java.util.Properties()
properties.put("driver", "com.amazon.redshift.jdbc42.Driver")
properties.put("url", "jdbc:redshift://redshift-host:5439/")
properties.put("user", "<username>") properties.put("password",spark.conf.get("spark.jdbc.password", "<default_pass>"))
val d_rs = spark.read.jdbc(properties.get("url").toString, "data_table", properties)
Using the Databricks Redshift data source package – for Bulk Data WRITING to Redshift, use this package:
Reading from and writing to Redshift stages data [and doesn’t clean up after itself] in S3, so use object lifecycle management!
val devices = spark.read.format("com.databricks.spark.redshift"). option("forward_spark_s3_credentials", "true"). option("url", "jdbc:redshift://redshift-host:5439/?user=<user>&password=<password>"). option("query", "SELECT * FROM devices"). option("tempdir", "s3://temporary-holding-bucket/").load()
Writing the dataframe to Redshift in the “public.temporary_devices” table:
devices_transformed.coalesce(64).write .format("com.databricks.spark.redshift") .option("forward_spark_s3_credentials", "true") .option("url", "jdbc:redshift://redshift-host:5439/?user=&password=") .option("dbtable", "public.temporary_devices") .option("tempdir", "s3a://temporary-holding-bucket/") .option("tempformat", "CSV GZIP") // EXPERIMENTAL, but CSV is higher performance than AVRO for loading into redshift .mode("error") .save()
* Note: coalesce(64) is called to reduce the number of output files to the s3 staging directory, because renaming files from their temporary location in S3 can be slow. This S3Committer should help alleviate that issue.
Resources
http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/