Apply an R Function in Spark

R/spark_apply.R

spark_apply

Description

Applies an R function to a Spark object (typically, a Spark DataFrame).

Usage

 
spark_apply( 
  x, 
  f, 
  columns = NULL, 
  memory = TRUE, 
  group_by = NULL, 
  packages = NULL, 
  context = NULL, 
  name = NULL, 
  barrier = NULL, 
  fetch_result_as_sdf = TRUE, 
  partition_index_param = "", 
  arrow_max_records_per_batch = NULL, 
  auto_deps = FALSE, 
  ... 
) 

Arguments

Arguments Description
x An object (usually a spark_tbl) coercable to a Spark DataFrame.
f A function that transforms a data frame partition into a data frame. The function f has signature f(df, context, group1, group2, ...) where df is a data frame with the data to be processed, context is an optional object passed as the context parameter and group1 to groupN contain the values of the group_by values. When group_by is not specified, f takes only one argument. Can also be an rlang anonymous function. For example, as ~ .x + 1 to define an expression that adds one to the given .x data frame.
columns A vector of column names or a named vector of column types for the transformed object. When not specified, a sample of 10 rows is taken to infer out the output columns automatically, to avoid this performance penalty, specify the column types. The sample size is configurable using the sparklyr.apply.schema.infer configuration option.
memory Boolean; should the table be cached into memory?
group_by Column name used to group by data frame partitions.
packages Boolean to distribute .libPaths() packages to each node, a list of packages to distribute, or a package bundle created with spark_apply_bundle(). Defaults to TRUE or the sparklyr.apply.packages value set in spark_config(). For clusters using Yarn cluster mode, packages can point to a package bundle created using spark_apply_bundle() and made available as a Spark file using config$sparklyr.shell.files. For clusters using Livy, packages can be manually installed on the driver node. For offline clusters where available.packages() is not available, manually download the packages database from https://cran.r-project.org/web/packages/packages.rds and set Sys.setenv(sparklyr.apply.packagesdb = "<pathl-to-rds>"). Otherwise, all packages will be used by default. For clusters where R packages already installed in every worker node, the spark.r.libpaths config entry can be set in spark_config() to the local packages library. To specify multiple paths collapse them (without spaces) with a comma delimiter (e.g., "/lib/path/one,/lib/path/two").
context Optional object to be serialized and passed back to f().
name Optional table name while registering the resulting data frame.
barrier Optional to support Barrier Execution Mode in the scheduler.
fetch_result_as_sdf Whether to return the transformed results in a Spark Dataframe (defaults to TRUE). When set to FALSE, results will be returned as a list of R objects instead. NOTE: fetch_result_as_sdf must be set to FALSE when the transformation function being applied is returning R objects that cannot be stored in a Spark Dataframe (e.g., complex numbers or any other R data type that does not have an equivalent representation among Spark SQL data types).
partition_index_param Optional if non-empty, then f also receives the index of the partition being processed as a named argument with this name, in addition to all positional argument(s) it will receive NOTE: when fetch_result_as_sdf is set to FALSE, object returned from the transformation function also must be serializable by the base::serialize function in R.
arrow_max_records_per_batch Maximum size of each Arrow record batch, ignored if Arrow serialization is not enabled.
auto_deps [Experimental] Whether to infer all required R packages by examining the closure f() and only distribute required R and their transitive dependencies to Spark worker nodes (default: FALSE). NOTE: this option will only take effect if packages is set to TRUE or is a character vector of R package names. If packages is a character vector of R package names, then both the set of packages specified by packages and the set of inferred packages will be distributed to Spark workers.
Optional arguments; currently unused.

Section

Configuration

spark_config() settings can be specified to change the workers environment. For instance, to set additional environment variables to each worker node use the sparklyr.apply.env.* config, to launch workers without --vanilla use sparklyr.apply.options.vanilla set to FALSE, to run a custom script before launching Rscript use sparklyr.apply.options.rscript.before.

Examples

 
 
library(sparklyr) 
sc <- spark_connect(master = "local[3]") 
 
# creates an Spark data frame with 10 elements then multiply times 10 in R 
sdf_len(sc, 10) %>% spark_apply(function(df) df * 10) 
#> # Source: spark<?> [?? x 1]
#>       id
#>    <dbl>
#>  1    10
#>  2    20
#>  3    30
#>  4    40
#>  5    50
#>  6    60
#>  7    70
#>  8    80
#>  9    90
#> 10   100
 
# using barrier mode 
sdf_len(sc, 3, repartition = 3) %>% 
  spark_apply(nrow, barrier = TRUE, columns = c(id = "integer")) %>% 
  collect() 
#> # A tibble: 3 × 1
#>      id
#>   <int>
#> 1     1
#> 2     1
#> 3     1