Read file(s) into a Spark DataFrame using a custom reader

R/data_interface.R

spark_read

Description

Run a custom R function on Spark workers to ingest data from one or more files into a Spark DataFrame, assuming all files follow the same schema.

Usage

spark_read(sc, paths, reader, columns, packages = TRUE, ...) 

Arguments

Arguments Description
sc A spark_connection.
paths A character vector of one or more file URIs (e.g., c(“hdfs://localhost:9000/file.txt”, “hdfs://localhost:9000/file2.txt”))
reader A self-contained R function that takes a single file URI as argument and returns the data read from that file as a data frame.
columns a named list of column names and column types of the resulting data frame (e.g., list(column_1 = “integer”, column_2 = “character”)), or a list of column names only if column types should be inferred from the data (e.g., list(“column_1”, “column_2”), or NULL if column types should be inferred and resulting data frame can have arbitrary column names
packages A list of R packages to distribute to Spark workers
Optional arguments; currently unused.

Examples

library(sparklyr) 
sc <- spark_connect( 
  master = "yarn", 
  spark_home = "~/spark/spark-2.4.5-bin-hadoop2.7" 
) 
# This is a contrived example to show reader tasks will be distributed across 
# all Spark worker nodes 
spark_read( 
  sc, 
  rep("/dev/null", 10), 
  reader = function(path) system("hostname", intern = TRUE), 
  columns = c(hostname = "string") 
) %>% sdf_collect() 

See Also

Other Spark serialization routines: collect_from_rds(), spark_insert_table(), spark_load_table(), spark_read_avro(), spark_read_binary(), spark_read_csv(), spark_read_delta(), spark_read_image(), spark_read_jdbc(), spark_read_json(), spark_read_libsvm(), spark_read_orc(), spark_read_parquet(), spark_read_source(), spark_read_table(), spark_read_text(), spark_save_table(), spark_write_avro(), spark_write_csv(), spark_write_delta(), spark_write_jdbc(), spark_write_json(), spark_write_orc(), spark_write_parquet(), spark_write_source(), spark_write_table(), spark_write_text()