Introduction

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small dataset or when running an iterative algorithm like random forests. Since operations in Spark are lazy, caching can help force computation. Sparklyr tools can be used to cache and uncache DataFrames. The Spark UI will tell you which DataFrames and what percentages are in memory.

By using a reproducible example, we will review some of the main configuration settings, commands and command arguments that can be used that can help you get the best out of Spark’s memory management options.

Preparation

Download Test Data

The 2008 and 2007 Flights data from the Statistical Computing site will be used for this exercise. The spark_read_csv supports reading compressed CSV files in a bz2 format, so no additional file preparation is needed.

if(!file.exists("2008.csv.bz2"))
  {download.file("http://stat-computing.org/dataexpo/2009/2008.csv.bz2", "2008.csv.bz2")}
if(!file.exists("2007.csv.bz2"))
  {download.file("http://stat-computing.org/dataexpo/2009/2007.csv.bz2", "2007.csv.bz2")}

Start a Spark session

A local deployment will be used for this example.

library(sparklyr)
library(dplyr)
library(ggplot2)

# Install Spark version 2
spark_install(version = "2.0.0")

# Customize the connection configuration
conf <- spark_config()
conf$`sparklyr.shell.driver-memory` <- "16G"

# Connect to Spark
sc <- spark_connect(master = "local", config = conf, version = "2.0.0")

The Memory Argument

In the spark_read_… functions, the memory argument controls if the data will be loaded into memory as an RDD. Setting it to FALSE means that Spark will essentially map the file, but not make a copy of it in memory. This makes the spark_read_csv command run faster, but the trade off is that any data transformation operations will take much longer.

spark_read_csv(sc, "flights_spark_2008", "2008.csv.bz2", memory = FALSE)

In the RStudio IDE, the flights_spark_2008 table now shows up in the Spark tab.

To access the Spark Web UI, click the SparkUI button in the RStudio Spark Tab. As expected, the Storage page shows no tables loaded into memory.

Loading Less Data into Memory

Using the pre-processing capabilities of Spark, the data will be transformed before being loaded into memory. In this section, we will continue to build on the example started in the Spark Read section

Lazy Transform

The following dplyr script will not be immediately run, so the code is processed quickly. There are some check-ups made, but for the most part it is building a Spark SQL statement in the background.

flights_table <- tbl(sc,"flights_spark_2008") %>%
  mutate(DepDelay = as.numeric(DepDelay),
         ArrDelay = as.numeric(ArrDelay),
         DepDelay > 15 , DepDelay < 240,
         ArrDelay > -60 , ArrDelay < 360, 
         Gain = DepDelay - ArrDelay) %>%
  filter(ArrDelay > 0) %>%
  select(Origin, Dest, UniqueCarrier, Distance, DepDelay, ArrDelay, Gain)

Register in Spark

sdf_register will register the resulting Spark SQL in Spark. The results will show up as a table called flights_spark. But a table of the same name is still not loaded into memory in Spark.

sdf_register(flights_table, "flights_spark")

Cache into Memory

The tbl_cache command loads the results into an Spark RDD in memory, so any analysis from there on will not need to re-read and re-transform the original file. The resulting Spark RDD is smaller than the original file because the transformations created a smaller data set than the original file.

tbl_cache(sc, "flights_spark")

Driver Memory

In the Executors page of the Spark Web UI, we can see that the Storage Memory is at about half of the 16 gigabytes requested. This is mainly because of a Spark setting called spark.memory.fraction, which reserves by default 40% of the memory requested.

Process on the fly

The plan is to read the Flights 2007 file, combine it with the 2008 file and summarize the data without bringing either file fully into memory.

spark_read_csv(sc, "flights_spark_2007" , "2007.csv.bz2", memory = FALSE)

Union and Transform

The union command is akin to the bind_rows dyplyr command. It will allow us to append the 2007 file to the 2008 file, and as with the previous transform, this script will be evaluated lazily.

all_flights <- tbl(sc, "flights_spark_2008") %>%
  union(tbl(sc, "flights_spark_2007")) %>%
  group_by(Year, Month) %>%
  tally()

Collect into R

When receiving a collect command, Spark will execute the SQL statement and send the results back to R in a data frame. In this case, R only loads 24 observations into a data frame called all_flights.

all_flights <- all_flights %>%
  collect()

Plot in R

Now the smaller data set can be plotted

ggplot(data = all_flights, aes(x = Month, y = n/1000, fill = factor(Year))) +
  geom_area(position = "dodge", alpha = 0.5) +
  geom_line(alph a = 0.4) +
  scale_fill_brewer(palette = "Dark2", name = "Year") +
  scale_x_continuous(breaks = 1:12, labels = c("J","F","M","A","M","J","J","A","S","O","N","D")) +
  theme_light() +
  labs(y="Number of Flights (Thousands)", title = "Number of Flights Year-Over-Year")
sparklyr is an RStudio project. © 2016 RStudio, Inc.