Using Apache Arrow

Introduction

Apache Arrow is a cross-language development platform for in-memory data. Arrow is supported starting with sparklyr 1.0.0 to improve performance when transferring data between Spark and R. You can find some performance benchmarks under:

Installation

Install the latest release of arrow from CRAN with

install.packages("arrow")

Please see https://arrow.apache.org/docs/r if you have any question about using arrow from R.

Use Cases

There are three main use cases for arrow in sparklyr:

  • Data Copying: When copying data with copy_to(), Arrow will be used.
  • Data Collection: Also, when collecting either, implicitly by printing datasets or explicitly calling collect.
  • R Transformations: When using spark_apply(), data will be transferred using Arrow when possible.

To use arrow in sparklyr one simply needs to import this library:

library(arrow)
Attaching package: ‘arrow’

The following object is masked from ‘package:utils’:

    timestamp

The following objects are masked from ‘package:base’:

    array, table

Considerations

Types

Some data types are mapped to slightly different, one can argue more correct, types when using Arrow. For instance, consider collecting 64 bit integers in sparklyr:

library(sparklyr)

sc <- spark_connect(master = "local")
integer64 <- sdf_len(sc, 2, type = "integer64")
integer64
# Source: spark<?> [?? x 1]
     id
  <dbl>
1     1
2     2

Notice that sparklyr collects 64 bit integers as double; however, using arrow:

library(arrow)
integer64
# Source: spark<?> [?? x 1]
  id             
  <S3: integer64>
1 1              
2 2 

64 bit integers are now being collected as proper 64 bit integer using the bit64 package.

Fallback

The Arrow R package supports many data types; however, in cases where a type is unsupported, sparklyr will fallback to not using arrow and print a warning.

library(sparklyr.nested)
library(sparklyr)
library(dplyr)
library(arrow)

sc <- spark_connect(master = "local")
cars <- copy_to(sc, mtcars)

sdf_nest(cars, hp) %>%
  group_by(cyl) %>%
  summarize(data = collect_list(data))
# Source: spark<?> [?? x 2]
    cyl data       
  <dbl> <list>     
1     6 <list [7]> 
2     4 <list [11]>
3     8 <list [14]>
Warning message:
In arrow_enabled_object.spark_jobj(sdf) :
  Arrow disabled due to columns: data