Introduction

Pairing Spark with S3 is becoming an increasingly popular approach. Because it separates the data from the computation, it lets us tear down or the Spark cluster when we are done with the analysis without losing the source data.

We thought it would be a good idea to run some experiments to find a recommendation that may work for those who are currently or thinking about using this approach for their analyses.

The article will cover the following:

  • The packages and stetting needed to connect R to Spark in Standalone mode and S3
  • A recommended approach to work with data from S3

Summary

If you are looking for the key points, and would prefer to skip the full article. Here is a bulleted list separated by major function. Some of the bullets in the list are followed by snippets of the code we used:

  • spark_connect
    • Only one package is needed to read S3 data: org.apache.hadoop:hadoop-aws

      conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"
    • spark.executor.memory is the only Spark parameter that we can tweak on a Standalone cluster
    • Do not request more spark.executor.memory than what is available in a single cluster

      conf$spark.executor.memory <- "14g"    # Each executor server had 14.7GB available
  • spark_read_csv
    • Try using s3a URI scheme first
    • Multiple smaller files with the same format are preferable than one large file in S3. Spark can read each file in parallel, and thus accelerating the data import considerably. The path passed to spark_read_csv would then be just the folder or bucket where the files are located:

      path =  "s3a://flights-data/full"
    • Load the full data set into Spark memory if there‚Äôs enough total memory in the cluster . This will make later queries run very fast:

      memory = TRUE
    • Defining the column names accelerates the initial data read. To do this you use the columns and the infer_schema arguments. If there are any NA values in numeric fields, define the column as character and then convert it on later subsets using dplyr

      columns = list(...ArrDelay = "character", DepDelay = "character",...),
      infer_schema = FALSE)
  • dplyr
    • It is better to use as.integer and not as.numeric if columns that are defined as character are really integer. The size of the resulting dataset in Spark memory is considerably smaller when using integers

        mutate(ArrDelay = as.integer(ArrDelay)
    • Load subsets of data into Spark memory that will be used for modeling. In R, this done with two sequenced operations: Register and then Cache

      sdf_register(tidy_flights, "tidy_spark") 
      tbl_cache(sc, "tidy_spark")

AWS

Spark Cluster (EC2)

We created a four node Spark 2.1.0 Standalone cluster in AWS EC2. R, RStudio and sparklyr are needed in only one node. Here is a conceptual layout of the cluster setup: