Configuring Spark Connections

Customizing connections

A connection to Spark can be customized by setting the values of certain Spark properties. In sparklyr, Spark properties can be set by using the config argument in the spark_connect() function.

By default, spark_connect() uses spark_config() as the default configuration. But that can be customized as shown in the example code below. Because of the unending number of possible combinations, spark_config() contains only a basic configuration, so it will be very likely that additional settings will be needed to properly connect to the cluster.

conf <- spark_config()   # Load variable with spark_config()

conf$spark.executor.memory <- "16G" # Use `$` to add or set values

sc <- spark_connect(master = "yarn-client", 
                    config = conf)  # Pass the conf variable 

Spark definitions

It may be useful to provide some simple definitions for the Spark nomenclature:

  • Node: A server

  • Worker Node: A server that is part of the cluster and are available to run Spark jobs

  • Master Node: The server that coordinates the Worker nodes.

  • Executor: A sort of virtual machine inside a node. One Node can have multiple Executors.

  • Driver Node: The Node that initiates the Spark session. Typically, this will be the server where sparklyr is located.

  • Driver (Executor): The Driver Node will also show up in the Executor list.

Useful concepts

  • Spark configuration properties passed by R are just requests - In most cases, the cluster has the final say regarding the resources apportioned to a given Spark session.

  • The cluster overrides ‘silently’ - Many times, no errors are returned when more resources than allowed are requested, or if an attempt is made to change a setting fixed by the cluster.

YARN cluster

Background

Using Spark and R inside a Hadoop based Data Lake is becoming a common practice at companies. Currently, there is no good way to manage user connections to the Spark service centrally. There are some caps and settings that can be applied, but in most cases there are configurations that the R user will need to customize.

The Running on YARN page in Spark’s official website is the best place to start for configuration settings reference, please bookmark it. Cluster administrators and users can benefit from this document. If Spark is new to the company, the YARN tunning article, courtesy of Cloudera, does a great job at explaining how the Spark/YARN architecture works.

Connection example

conf <- spark_config()

conf$spark.executor.memory <- "300M"
conf$spark.executor.cores <- 2
conf$spark.executor.instances <- 3
conf$spark.dynamicAllocation.enabled <- "false"

sc <- spark_connect(master = "yarn-client", 
                    spark_home = "/usr/lib/spark/",
                    version = "1.6.0",
                    config = conf)

Executors page

To see how the requested configuration affected the Spark connection, go to the Executors page in the Spark Web UI. Typically, the Spark Web UI can be found using the exact same URL used for RStudio but on port 4040.

Notice that 155.3MB per executor are assigned instead of the 300MB requested. This is because the spark.memory.fraction has been fixed by the cluster, plus, there is fixed amount of memory designated for overhead.

Executor memory error

Requesting more memory or CPUs for Executors than allowed will return an error. This is one of the exceptions to the cluster’s ‘silent’ overrides. It will return a message similar to this:

    Failed during initialize_connection: java.lang.IllegalArgumentException: Required executor memory (16384+1638 MB) is above the max threshold (8192 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'

A cluster’s administrator is the only person who can make changes to the settings mentioned in the error. If the cluster is supported by a vendor, like Cloudera or Hortonworks, then the change can be made using the cluster’s web UI. Otherwise, changes to those settings are done directly in the yarn-default.xml file.

Kerberos

There are two options to access a “kerberized” data lake:

  • Use kinit to get and cache the ticket. After kinit is installed and configured. After kinit is setup, it can used in R via a system() call prior to connecting to the cluster:

    system("echo '<password>' | kinit <username>")

    For more information visit this site: Apache - Authenticate with kinit

  • A preferred option may be to use the out-of-the-box integration with Kerberos that the commercial version of RStudio Server offers.

Standalone mode

Connection example

conf <- spark_config()
conf$spark.executor.memory <- "7GB"
conf$spark.memory.fraction <- 0.9
conf$spark.executor.cores <- 2
conf$spark.dynamicAllocation.enabled <- "false"

sc <- spark_connect(master="spark://master-url:7077", 
              version = "2.1.0",
              config = conf,
              spark_home = "/home/ubuntu/spark-2.1.0-bin-hadoop2.7/")

Executors page

To see how the requested configuration affected the Spark connection, go to the Executors page in the Spark Web UI. Typically, the Spark Web UI can be found using the exact same URL used for RStudio but on port 4040:

Local mode

Connection example

conf$`sparklyr.cores.local` <- 4
conf$`sparklyr.shell.driver-memory` <- "16G"
conf$spark.memory.fraction <- 0.9

sc <- spark_connect(master = "local", 
                    version = "2.1.0",
                    config = conf)

Executors page

To see how the requested configuration affected the Spark connection, go to the Executors page in the Spark Web UI available in http://localhost:4040/storage/