Spark Connect, and Databricks Connect v2

Last updated: Wed Sep 27 08:58:48 2023

Intro

Spark Connect introduced a decoupled client-server architecture that allows remote connectivity to Spark clusters using the DataFrame API. The separation between client and server allows Spark to be leveraged from everywhere, and this would allow R users to interact with a cluster from the comfort of their preferred environment, laptop or otherwise. Databricks Connect, available in Databricks Runtime version 13 and above, is based on this new architecture.

We are working on enhancing sparklyr so that it can bring the benefits of Spark Connect to the R user community.

The Solution

Spark Connect requires the use of a remote procedure call framework called gRPC. At this time, the Spark team provides two higher level APIs that interact with gRPC. One is Scala based, and the other Python based.

In the development version of sparklyr, we are using reticulate to interact with the Python API. sparklyr extends the functionality, and user experience, by providing the dplyrback-end, DBI back-end, RStudio’s Connection pane integration.

In order to quickly iterate on enhancements and bug fixes, we have decided to isolate the Python integration into its own package. The new package, called pysparklyr, is an extension of sparklyr.

Package Installation

As mentioned above, we are still working integrating sparklyr with Connect. To access the new capabilities, you will need sparklyr version 1.8.3 or higher, and the GitHub version of pysparklyr. To install use:

install.packages("sparklyr")
remotes::install_github("mlverse/pysparklyr")

Initial Setup

To communicate with Connect, sparklyr needs specific Python components. We provide a helper function that installs those components. You should only need to run this command the first time you install pysparklyr:

pysparklyr::install_pyspark()

Additional setup details

The function will attempt to install Python if one is not found. It will also create a new Python Virtual Environment. The new virtual environment will contain the necessary Python libraries that sparklyr will use to interact with Connect. In addition, install_pyspark() installs specific versions of the libraries in order to insure compatability. It does this by taking advantage of the snapshot capabilities of Posit’s Package Manager.

By default, sparklyr will use whatever virtual environment is currently loaded in your session. If none is loaded, then sparklyr will load the one installed by install_pyspark(). The default name of the Virtual Environment that sparklyr uses is r-sparklyr.

If you wish to use your own Python environment, then just make sure to load it before calling spark_connect(). If there is a Python environment already loaded when you connect to your Spark cluster, then sparklyr will use that environment instead. If you use your own Python environment you will need the following libraries installed:

  • pyspark
  • pandas
  • PyArrow
  • grpcio
  • google-api-python-client
  • grpcio_status
  • torch (Spark 3.5+)
  • torcheval (Spark 3.5+)

If connecting to Databrics Connect, you will also need:

  • databricks-connect
  • delta-spark

Databricks Connect (ML 13+)

Getting started

To use with Databricks Connect, in run-time 13 or above, you will need three configuration items:

In spark_connect() those items can be passed as function arguments. Also note that to let sparklyr know that you are attempting to use Databricks Connect, use method = databricks_connect:

library(sparklyr)

sc <- spark_connect(
 master = "[Your Workspace Instance URL]", 
 cluster_id = "[Cluster ID]"
 token = "[Your PAT - Please do not include 
          it as plain text here,  please refer 
          to the next section]"
  )

Safely connect

For your safety and convenience, you can save your authentication token (PAT), and your Workspace Instance URL in a environment variable. The two variables are:

  • DATABRICKS_TOKEN
  • DATABRICKS_HOST

This will prevent your token to be shown in plain text in your code. You can set the environment variables at the beginning of your R session with Sys.setenv().

Preferably, the two variables can be set for all R sessions by saving them to the .Renviron file. The usethis package has a handy function that opens the file so you can edit it: usethis::edit_r_environ(). Then simply append the two entries to your .Renviron file. Once you are done, save the .Renviron file, and restart R.

After that, use spark_connect() to open the connection to Databricks. You will need to only pass the cluster_id of your cluster, and tell sparklyr that you are connecting to a Spark cluster with run time 13 or above:

library(sparklyr)

sc <- spark_connect(
  cluster_id = "[Cluster ID]",
  method = "databricks_connect"
)

If you wish to further remove identifying values from your connection code, you can also save the cluster_id in the DATABRICKS_CLUSTER_ID environment variable. After you save it using Sys.setenv(), or by updating your .Renviron file, your connection code will look like this:

library(sparklyr)

sc <- spark_connect(
  method = "databricks_connect"
)
#> ✔ Using the 'r-sparklyr' Python environment (/Users/edgar/.virtualenvs/r-sparklyr/bin/python)

RStudio’s Connection pane

Thanks to the new way we are integrating with Spark, it is now possible to display the same structure displayed in the Databricks Data Explorer page. In Databricks, the current data structure levels are:

  • Catalog
    • Database
      • Table

In RStudio, you can navigate the data structure by expanding from the top level, all the way down to the table you wish to explore. Once expanded, the table’s fields, and their types are displayed.

In the Connection Pane, you can click on the table icon, situated to the right of the table name, to preview the first 1,000 rows:

Using the Connection to Access Data

After connecting, you can use dbplyr’s in_catalog() function to access any table in your data catalog. You will only need to pass the respective names of the three levels as comma separated character entries to in_catalog() in this order: Catalog, Database, and Table.

Here is an example of using tbl() and in_catalog() to point to the trips table, which is inside nyctaxi database, which is a database inside the samples catalog:

library(dplyr)
library(dbplyr)

trips <- tbl(sc, in_catalog("samples", "nyctaxi", "trips"))

trips
#> # Source: spark<`samples`.`nyctaxi`.`trips`> [?? x 6]
#>    tpep_pickup_datetime tpep_dropoff_datetime trip_distance fare_amount
#>    <dttm>               <dttm>                        <dbl>       <dbl>
#>  1 2016-02-14 10:52:13  2016-02-14 11:16:04            4.94        19  
#>  2 2016-02-04 12:44:19  2016-02-04 12:46:00            0.28         3.5
#>  3 2016-02-17 11:13:57  2016-02-17 11:17:55            0.7          5  
#>  4 2016-02-18 04:36:07  2016-02-18 04:41:45            0.8          6  
#>  5 2016-02-22 08:14:41  2016-02-22 08:31:52            4.51        17  
#>  6 2016-02-05 00:45:02  2016-02-05 00:50:26            1.8          7  
#>  7 2016-02-15 09:03:28  2016-02-15 09:18:45            2.58        12  
#>  8 2016-02-25 13:09:26  2016-02-25 13:24:50            1.4         11  
#>  9 2016-02-13 10:28:18  2016-02-13 10:36:36            1.21         7.5
#> 10 2016-02-13 18:03:48  2016-02-13 18:10:24            0.6          6  
#> # ℹ more rows
#> # ℹ 2 more variables: pickup_zip <int>, dropoff_zip <int>

After pointing tbl() to that specific table, you can then use dplyr.

trips %>%
  group_by(pickup_zip) %>%
  summarise(
    count = n(),
    avg_distance = mean(trip_distance, na.rm = TRUE)
  )
#> # Source: spark<?> [?? x 3]
#>    pickup_zip count avg_distance
#>         <int> <dbl>        <dbl>
#>  1      10032    15         4.49
#>  2      10013   273         2.98
#>  3      10022   519         2.00
#>  4      10162   414         2.19
#>  5      10018  1012         2.60
#>  6      11106    39         2.03
#>  7      10011  1129         2.29
#>  8      11103    16         2.75
#>  9      11237    15         3.31
#> 10      11422   429        15.5 
#> # ℹ more rows

When you are done with you queries and computations, you should disconnect from the cluster.

spark_disconnect(sc)

Spark Connect

Connecting

To start a session with a open source Spark cluster, via Spark Connect, you will need to set the master, and method. The master will be an IP, and maybe a port that you will need to pass. The protocol to use to put together the proper connection URL is “sc://”. For method, use “spark_connect”. Here is an example:

library(sparklyr)

sc <- spark_connect(
  master = "sc://[Host IP(:Host Port - optional)]", 
  method = "spark_connect"
  )

Run locally

It is possible to run Spark Connect in your machine We provide helper functions that let you setup, and start/stop the services in locally.

If you wish to try this out, first install Spark 3.4 or above:

spark_install("3.4")

After installing, start the Spark Connect using:

pysparklyr::spark_connect_service_start()

To connect to your local Spark Connect, use localhost as the address for master:

sc <- spark_connect(
  master = "sc://localhost", 
  method = "spark_connect"
  )

Now, you are able to interact with your local Spark session:

library(dplyr)

tbl_mtcars <- copy_to(sc, mtcars)

tbl_mtcars %>% 
  group_by(am) %>% 
  summarise(mpg = mean(mpg, na.rm = TRUE))

When done, you can disconnect from Spark Connect:

spark_disconnect(sc)

The regular version of local Spark would terminate the local cluster when the you pass spark_disconnect(). For Spark Connect, the local cluster needs to be stopped independently.

pysparklyr::spark_connect_service_stop()

What is supported

Here is a list of what we currently support, and do not support via sparklyr and Connect:

Supported:

  • Integration with most of the dplyr, and DBI, APIs
  • Integration with the invoke() command
  • RStudio Connections Pane navigation
  • Support for Personal Access Token security authentication for Databricks Connect
  • Support for most read and write commands. These have only been tested in Spark Connect.

Not supported:

  • ML functions - All functions, in sparklyr, that have the ml_ and ft_ are currently not supported. The reason is that Spark 3.4 does not currently support MLlib. We expect that some ML support will be available in Spark 3.5. At that time we will work on integrating the new ML routines from Connect into sparklyr.

  • SDF functions - Most of these functions require SparkSession, which is currently not supported in Spark 3.4.

  • tidyr - This is ongoing work that we are focusing on in sparklyr. We are implementing these functions using PySpark DataFrame commands, instead of depending on the Scala implementation.

A more detailed list of our progress, which includes individual functions we have confirmed that work, do not work, or will work differently, is available here: pysparklyr progress