Introduction

Pairing a Standalone Spark cluster with S3 is becoming an increasingly popular approach. Unlike in a Hadoop cluster, the data and the computation remain separated thus allowing us repurpose or even tear down the cluster when done.

The article will cover the following:

  • The packages and settings needed to connect R to Spark in Standalone mode and S3
  • A recommended approach to work with data from S3

Summary

If you are looking for the key points, and would prefer to skip the full article. Here is a bulleted list separated by major function. Some of the bullets in the list are followed by snippets of the code we used:

  • spark_connect
    • Only one package is needed to read S3 data: org.apache.hadoop:hadoop-aws

      conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"
    • spark.executor.memory is the only Spark parameter that we can tweak on a Standalone cluster
    • Do not request more spark.executor.memory than what is available in a single cluster

      conf$spark.executor.memory <- "14g"    # Each executor server had 14.7GB available
  • spark_read_csv
    • Try using s3a URI scheme first
    • Multiple smaller files with the same format are preferable than one large file in S3. Spark can read each file in parallel, and thus accelerating the data import considerably. The path passed to spark_read_csv would then be just the folder or bucket where the files are located:

      path =  "s3a://flights-data/full"
    • Load the full data set into Spark memory if there‚Äôs enough total memory in the cluster . This will make later queries run very fast:

      memory = TRUE
    • Defining the column names accelerates the initial data read. To do this you use the columns and the infer_schema arguments. If there are any NA values in numeric fields, define the column as character and then convert it on later subsets using dplyr

      columns = list(...ArrDelay = "character", DepDelay = "character",...),
      infer_schema = FALSE)
  • dplyr
    • It is better to use as.integer and not as.numeric if columns that are defined as character are really integer. The size of the resulting dataset in Spark memory is considerably smaller when using integers

        mutate(ArrDelay = as.integer(ArrDelay)
    • Load subsets of data into Spark memory that will be used for modeling. In R, this done with two sequenced operations: Register and then Cache

      sdf_register(tidy_flights, "tidy_spark") 
      tbl_cache(sc, "tidy_spark")

AWS

Spark Cluster (EC2)

We created a four node Spark 2.1.0 Standalone cluster in AWS EC2. R, RStudio and sparklyr are needed in only one node. Here is a conceptual layout of the cluster setup: