Pairing Spark with S3 is becoming an increasingly popular approach. Because it separates the data from the computation, it lets us tear down or the Spark cluster when we are done with the analysis without losing the source data.
We thought it would be a good idea to run some experiments to find a recommendation that may work for those who are currently or thinking about using this approach for their analyses.
The article will cover the following:
If you are looking for the key points, and would prefer to skip the full article. Here is a bulleted list separated by major function. Some of the bullets in the list are followed by snippets of the code we used:
Only one package is needed to read S3 data: org.apache.hadoop:hadoop-aws
conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"
Do not request more spark.executor.memory than what is available in a single cluster
conf$spark.executor.memory <- "14g" # Each executor server had 14.7GB available
Multiple smaller files with the same format are preferable than one large file in S3. Spark can read each file in parallel, and thus accelerating the data import considerably. The path passed to
spark_read_csv would then be just the folder or bucket where the files are located:
path = "s3a://flights-data/full"
Load the full data set into Spark memory if there’s enough total memory in the cluster . This will make later queries run very fast:
memory = TRUE
Defining the column names accelerates the initial data read. To do this you use the
columns and the
infer_schema arguments. If there are any
NA values in numeric fields, define the column as
character and then convert it on later subsets using
columns = list(...ArrDelay = "character", DepDelay = "character",...), infer_schema = FALSE)
It is better to use
as.integer and not
as.numeric if columns that are defined as
character are really
integer. The size of the resulting dataset in Spark memory is considerably smaller when using integers
mutate(ArrDelay = as.integer(ArrDelay)
Load subsets of data into Spark memory that will be used for modeling. In R, this done with two sequenced operations: Register and then Cache
sdf_register(tidy_flights, "tidy_spark") tbl_cache(sc, "tidy_spark")