Configuring Spark Connections

Local mode

Local mode is an excellent way to learn and experiment with Spark. Local mode also provides a convenient development environment for analyses, reports, and applications that you plan to eventually deploy to a multi-node Spark cluster.

To work in local mode, you should first install a version of Spark for local use. You can do this using the spark_install() function, for example:

Connection example

conf$`sparklyr.cores.local` <- 4
conf$`sparklyr.shell.driver-memory` <- "16G"
conf$spark.memory.fraction <- 0.9

sc <- spark_connect(master = "local", 
                    version = "2.1.0",
                    config = conf)

Executors page

To see how the requested configuration affected the Spark connection, go to the Executors page in the Spark Web UI available in http://localhost:4040/storage/

Customizing connections

A connection to Spark can be customized by setting the values of certain Spark properties. In sparklyr, Spark properties can be set by using the config argument in the spark_connect() function.

By default, spark_connect() uses spark_config() as the default configuration. But that can be customized as shown in the example code below. Because of the unending number of possible combinations, spark_config() contains only a basic configuration, so it will be very likely that additional settings will be needed to properly connect to the cluster.

conf <- spark_config()   # Load variable with spark_config()

conf$spark.executor.memory <- "16G" # Use `$` to add or set values

sc <- spark_connect(master = "yarn-client", 
                    config = conf)  # Pass the conf variable 

Spark definitions

It may be useful to provide some simple definitions for the Spark nomenclature:

  • Node: A server

  • Worker Node: A server that is part of the cluster and are available to run Spark jobs

  • Master Node: The server that coordinates the Worker nodes.

  • Executor: A sort of virtual machine inside a node. One Node can have multiple Executors.

  • Driver Node: The Node that initiates the Spark session. Typically, this will be the server where sparklyr is located.

  • Driver (Executor): The Driver Node will also show up in the Executor list.

Useful concepts

  • Spark configuration properties passed by R are just requests - In most cases, the cluster has the final say regarding the resources apportioned to a given Spark session.

  • The cluster overrides ‘silently’ - Many times, no errors are returned when more resources than allowed are requested, or if an attempt is made to change a setting fixed by the cluster.

YARN

Background

Using Spark and R inside a Hadoop based Data Lake is becoming a common practice at companies. Currently, there is no good way to manage user connections to the Spark service centrally. There are some caps and settings that can be applied, but in most cases there are configurations that the R user will need to customize.

The Running on YARN page in Spark’s official website is the best place to start for configuration settings reference, please bookmark it. Cluster administrators and users can benefit from this document. If Spark is new to the company, the YARN tunning article, courtesy of Cloudera, does a great job at explaining how the Spark/YARN architecture works.

Client mode

Using yarn-client as the value for the master argument in spark_connect() will make the server in which R is running to be the Spark’s session driver. Here is a sample connection:

conf <- spark_config()

conf$spark.executor.memory <- "300M"
conf$spark.executor.cores <- 2
conf$spark.executor.instances <- 3
conf$spark.dynamicAllocation.enabled <- "false"

sc <- spark_connect(master = "yarn-client", 
                    spark_home = "/usr/lib/spark/",
                    version = "1.6.0",
                    config = conf)

Executors page

To see how the requested configuration affected the Spark connection, go to the Executors page in the Spark Web UI. Typically, the Spark Web UI can be found using the exact same URL used for RStudio but on port 4040.

Notice that 155.3MB per executor are assigned instead of the 300MB requested. This is because the spark.memory.fraction has been fixed by the cluster, plus, there is fixed amount of memory designated for overhead.

Cluster mode

Running in cluster mode means that YARN will choose where the driver of the Spark session will run. This means that the server where R is running may not necessarily be the driver for that session. Here is a good write-up explaining how running Spark applications work: Running Spark on YARN

The server will need to have copies of at least two files: yarn-site.xml and hive-site.xml. There may be other files needed based on your cluster’s individual setup.

This is an example of connecting to a Cloudera cluster:

library(sparklyr)

Sys.setenv(JAVA_HOME="/usr/lib/jvm/java-7-oracle-cloudera/")
Sys.setenv(SPARK_HOME = '/opt/cloudera/parcels/CDH/lib/spark')
Sys.setenv(YARN_CONF_DIR = '/opt/cloudera/parcels/CDH/lib/spark/conf/yarn-conf')

conf$spark.executor.memory <- "300M"
conf$spark.executor.cores <- 2
conf$spark.executor.instances <- 3
conf$spark.dynamicAllocation.enabled <- "false"
conf <- spark_config()

sc <- spark_connect(master = "yarn-cluster", 
                    config = conf)

Executor memory error

Requesting more memory or CPUs for Executors than allowed will return an error. This is one of the exceptions to the cluster’s ‘silent’ overrides. It will return a message similar to this:

    Failed during initialize_connection: java.lang.IllegalArgumentException: Required executor memory (16384+1638 MB) is above the max threshold (8192 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'

A cluster’s administrator is the only person who can make changes to the settings mentioned in the error. If the cluster is supported by a vendor, like Cloudera or Hortonworks, then the change can be made using the cluster’s web UI. Otherwise, changes to those settings are done directly in the yarn-default.xml file.

Kerberos

There are two options to access a “kerberized” data lake:

  • Use kinit to get and cache the ticket. After kinit is installed and configured. After kinit is setup, it can used in R via a system() call prior to connecting to the cluster:
system("echo '<password>' | kinit <username>")

For more information visit this site: Apache - Authenticate with kinit

  • A preferred option may be to use the out-of-the-box integration with Kerberos that the commercial version of RStudio Server offers.

Standalone mode

Connection example

conf <- spark_config()
conf$spark.executor.memory <- "7GB"
conf$spark.memory.fraction <- 0.9
conf$spark.executor.cores <- 2
conf$spark.dynamicAllocation.enabled <- "false"

sc <- spark_connect(master="spark://master-url:7077", 
              version = "2.1.0",
              config = conf,
              spark_home = "/home/ubuntu/spark-2.1.0-bin-hadoop2.7/")

Executors page

To see how the requested configuration affected the Spark connection, go to the Executors page in the Spark Web UI. Typically, the Spark Web UI can be found using the exact same URL used for RStudio but on port 4040: