Using Spark with AWS S3 buckets

AWS Access Keys

AWS Access Keys are needed to access S3 data. To learn how to setup a new keys, please review the AWS documentation: http://docs.aws.amazon.com/general/latest/gr/managing-aws-access-keys.html .We then pass the keys to R via Environment Variables:

Sys.setenv(AWS_ACCESS_KEY_ID="[Your access key]")
Sys.setenv(AWS_SECRET_ACCESS_KEY="[Your secret access key]")

Connecting to Spark

There are four key settings needed to connect to Spark and use S3:

  • A Hadoop-AWS package
  • Executor memory (key but not critical)
  • The master URL
  • The Spark Home

To connect to Spark, we first need to initialize a variable with the contents of sparklyr default config (spark_config) which we will then customize for our needs

library(sparklyr)

conf <- spark_config()

Hadoop-AWS package:

A Spark connection can be enhanced by using packages, please note that these are not R packages. For example, there are packages that tells Spark how to read CSV files, Hadoop or Hadoop in AWS.

In order to read S3 buckets, our Spark connection will need a package called hadoop-aws. If needed, multiple packages can be used. We experimented with many combinations of packages, and determined that for reading data in S3 we only need the one. The version we used, 2.7.3, refers to the latest Hadoop version, so as this article ages, please make sure to check this site to ensure that you are using the latest version: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3"

Executor Memory

As mentioned above this setting key but not critical. There are two points worth highlighting about it is:

  • The only performance related setting in a Spark Stand Alone cluster that can be tweaked, and in most cases because Spark defaults to a fraction of what is available, we then need to increase it by manually passing a value to that setting.

  • If more than the available RAM is requested, then Spark will set the Cores to 0, thus rendering the session unusable.

conf$spark.executor.memory <- "14g"

Master URL and Spark home

There are three important points to mention when executing the spark_connect command:

  1. The master will be the Spark Master’s URL. To find the URL, please see the Spark Cluster section.
  2. Point the Spark Home to the location where Spark was installed in this node
  3. Make sure to the conf variable as the value for the config argument
sc <- spark_connect(master = "spark://ip-172-30-1-5.us-west-2.compute.internal:7077", 
                    spark_home = "/home/ubuntu/spark-2.1.0-bin-hadoop2.7/",
                    config =  conf)

Data Import/Wrangle approach

We experimented with multiple approaches. Most of the factors for settling on a recommended approach were made based on the speed of each step. The premise is that we would rather wait longer during Data Import, if it meant that we can much faster Register and Cache our data subsets during Data Wrangling, especially since we would expect to end up with many subsets as we explore and model.The selected combination was the second slowest during the Import stage, but the fastest when caching a subset, by a lot.

In our tests, it took 72 seconds to read and cache the 29 columns of the 41 million rows of data, the slowest was 77 seconds. But when it comes to registering and caching a considerably sizable subset of 3 columns and almost all of the 41 million records, this approach was 17X faster than the second fastest approach. It took 1/3 of a second to register and cache the subset, the second fastest was 5 seconds.

To implement this approach, we need to set three arguments in the spark_csv_read() step:

  • memory
  • infer_schema
  • columns

Again, this is a recommended approach. The columns argument is needed only if infer_schema is set to FALSE. When memory is set to TRUE it makes Spark load the entire dataset into memory, and setting infer_schema to FALSE prevents Spark from trying to figure out what the schema of the files are. By trying different combinations the memory and infer_schema arguments you may be able to find an approach that may better fits your needs.

Reading the schema

Surprisingly, another critical detail that can easily be overlooked is choosing the right s3 URI scheme. There are two options: s3n and s3a. In most examples and tutorials I found, there was no reason give of why or when to use which one. The article the finally clarified it was this one: https://wiki.apache.org/hadoop/AmazonS3

The gist of it is that s3a is the recommended one going forward, especially for Hadoop versions 2.7 and above. This means that if we copy from older examples that used Hadoop 2.6 we would more likely also used s3n thus making data import much, much slower.

Data Import

After the long introduction in the previous section, there is only one point to add about the following code chunk. If there are any NA values in numeric fields, then define the column as character and then convert it on later subsets using dplyr. The data import will fail if it finds any NA values on numeric fields. This is a small trade off in this approach because the next fastest one does not have this issue but is 17X slower at caching subsets.

flights <- spark_read_csv(sc, "flights_spark", 
                          path =  "s3a://flights-data/full", 
                          memory = TRUE, 
                          columns = list(
                            Year = "character",
                            Month = "character",
                            DayofMonth = "character",
                            DayOfWeek = "character",
                            DepTime = "character",
                            CRSDepTime = "character",
                            ArrTime = "character",
                            CRSArrTime = "character",
                            UniqueCarrier = "character",
                            FlightNum = "character",
                            TailNum = "character",
                            ActualElapsedTime = "character",
                            CRSElapsedTime = "character",
                            AirTime = "character",
                            ArrDelay = "character",
                            DepDelay = "character",
                            Origin = "character",
                            Dest = "character",
                            Distance = "character",
                            TaxiIn = "character",
                            TaxiOut = "character",
                            Cancelled = "character",
                            CancellationCode = "character",
                            Diverted = "character",
                            CarrierDelay = "character",
                            WeatherDelay = "character",
                            NASDelay = "character",
                            SecurityDelay = "character",
                            LateAircraftDelay = "character"), 
                         infer_schema = FALSE)

Data Wrangle

There are a few points we need to highlight about the following simple dyplr code:

Because there were NAs in the original fields, we have to mutate them to a number. Try coercing any variable as integer instead of numeric, this will save a lot of space when cached to Spark memory. The sdf_register command can be piped at the end of the code. After running the code, a new table will appear in the RStudio IDE’s Spark tab

tidy_flights <- tbl(sc, "flights_spark") %>%
  mutate(ArrDelay = as.integer(ArrDelay),
         DepDelay = as.integer(DepDelay),
         Distance = as.integer(Distance)) %>%
  filter(!is.na(ArrDelay)) %>%
  select(DepDelay, ArrDelay, Distance) %>%
  sdf_register("tidy_spark")

After we use tbl_cache() to load the tidy_spark table into Spark memory. We can see the new table in the Storage page of our Spark session.

tbl_cache(sc, "tidy_spark")