Loading Libraries

suppressPackageStartupMessages({
  library(tidyverse)
  library(sparklyr)
  library(DBI)})

Connecting to Spark

conf <- spark_config()
conf$spark.executor.cores <- 4
conf$spark.executor.memory <- "2G"
sc <- spark_connect(master = "yarn-client", 
                    version = '1.6.2',
                    spark_home = "/usr/lib/spark",
                    config = conf)

Variable pointer

taxi <- tbl(sc, "taxi")

Get column stats

t
Source:   query [1 x 5]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

    m_dist   m_pass    m_tip    m_tot    rows
     <dbl>    <dbl>    <dbl>    <dbl>   <dbl>
1 3.140056 5.116351 1.807587 16.86587 1159640

Register and Cache

Subset of ‘taxi’ table

taxi <- taxi %>%
  filter(!is.na(tpep_pickup_datetime)) %>%
  select(tpep_pickup_datetime,
         tpep_dropoff_datetime,
         trip_distance,
         passenger_count,
         tip_amount,
         total_amount,
         payment_type) 

Register subset

sdf_register(taxi, "spark_taxi") 
Source:   query [1.114e+07 x 7]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

    tpep_pickup_datetime tpep_dropoff_datetime trip_distance passenger_count tip_amount total_amount
                   <chr>                 <chr>         <dbl>           <int>      <dbl>        <dbl>
1  2016-06-09 21:06:36.0 2016-06-09 21:13:08.0          0.79               2       0.00         7.30
2  2016-06-09 21:06:36.0 2016-06-09 21:35:11.0          5.22               1       4.00        27.30
3  2016-06-09 21:06:36.0 2016-06-09 21:13:10.0          1.26               1       1.56         9.36
4  2016-06-09 21:06:36.0 2016-06-09 21:36:10.0          7.39               1       1.00        28.30
5  2016-06-09 21:06:36.0 2016-06-09 21:23:23.0          3.10               1       2.96        17.76
6  2016-06-09 21:06:36.0 2016-06-09 21:19:21.0          2.17               1       2.36        14.16
7  2016-06-09 21:06:36.0 2016-06-09 21:30:13.0          6.02               5       0.00        22.80
8  2016-06-09 21:06:37.0 2016-06-09 21:16:47.0          1.40               1       1.95        11.75
9  2016-06-09 21:06:37.0 2016-06-09 21:15:44.0          1.20               1       0.00         9.30
10 2016-06-09 21:06:37.0 2016-06-09 21:23:57.0          1.90               2       3.33        16.63
# ... with 1.114e+07 more rows, and 1 more variables: payment_type <int>

Cache subset into Spark Storage (memory)

tbl_cache(sc, "spark_taxi")

Collect

collect_table <- taxi %>%
  filter(!is.na(passenger_count)) %>%
  group_by(passenger_count) %>%
  tally %>%
  collect
collect_table

Visualizations

collect_table %>%
  rename(trips = n) %>%
  ggplot() +
    geom_bar(aes(x=reorder(passenger_count, -passenger_count), y = trips), stat = "identity", fill = "lightgreen") +
    geom_text(aes(x=reorder(passenger_count, -passenger_count), y = 0, label = prettyNum(trips, big.mark = ",")), hjust = 0) +
    coord_flip() +
    labs(x = "Passangers", title = "Trips by Passenger Count") +
    theme_light() +
    theme(axis.text.x=element_blank())

Hive UDFs

Full list: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

taxi %>%
  mutate(seconds = unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)) %>%
  select(seconds)
Source:   query [1.114e+07 x 1]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

   seconds
     <dbl>
1      392
2     1715
3      394
4     1774
5     1007
6      765
7     1417
8      610
9      547
10    1040
# ... with 1.114e+07 more rows
         
temp_table <- taxi %>%
  mutate(hrs = (unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime))/3600,
         speed = trip_distance / hrs) %>%
  select(tpep_pickup_datetime,
         tpep_dropoff_datetime,
         hrs,
         trip_distance,
         speed)
temp_table
Source:   query [1.114e+07 x 5]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

    tpep_pickup_datetime tpep_dropoff_datetime       hrs trip_distance     speed
                   <chr>                 <chr>     <dbl>         <dbl>     <dbl>
1  2016-06-09 21:06:36.0 2016-06-09 21:13:08.0 0.1088889          0.79  7.255102
2  2016-06-09 21:06:36.0 2016-06-09 21:35:11.0 0.4763889          5.22 10.957434
3  2016-06-09 21:06:36.0 2016-06-09 21:13:10.0 0.1094444          1.26 11.512690
4  2016-06-09 21:06:36.0 2016-06-09 21:36:10.0 0.4927778          7.39 14.996618
5  2016-06-09 21:06:36.0 2016-06-09 21:23:23.0 0.2797222          3.10 11.082423
6  2016-06-09 21:06:36.0 2016-06-09 21:19:21.0 0.2125000          2.17 10.211765
7  2016-06-09 21:06:36.0 2016-06-09 21:30:13.0 0.3936111          6.02 15.294284
8  2016-06-09 21:06:37.0 2016-06-09 21:16:47.0 0.1694444          1.40  8.262295
9  2016-06-09 21:06:37.0 2016-06-09 21:15:44.0 0.1519444          1.20  7.897623
10 2016-06-09 21:06:37.0 2016-06-09 21:23:57.0 0.2888889          1.90  6.576923
# ... with 1.114e+07 more rows
temp_table %>%
  mutate(tr = round(speed,0)) %>%
  filter(tr >= 1 , tr <= 50) %>%
  group_by(tr) %>%
  tally %>%
  arrange(tr) %>%
  collect %>%
  ggplot() +
    geom_bar(aes(tr, n, fill = n), stat = "identity")

Modeling

tidy_table <- taxi %>%
  mutate(cc = ifelse(payment_type==1, "Yes","No")) %>%
    select(tip_amount,
         trip_distance,
         passenger_count,
         cc) %>%
  sdf_sample(0.1, seed = 100) %>%
  sdf_register("spark_tidy")
tbl_cache(sc, "spark_tidy")
simple_model <- tidy_table %>%
  ml_linear_regression(tip_amount ~ .)
* No rows dropped by 'na.omit' call
summary(simple_model)
Call: ml_linear_regression(., tip_amount ~ .)

Deviance Residuals: (approximate):
       Min         1Q     Median         3Q        Max 
-57.101397  -0.666543  -0.009081   0.577392 168.602845 

Coefficients:
                   Estimate  Std. Error   t value  Pr(>|t|)    
(Intercept)     -0.91558210  0.00405384 -225.8553 < 2.2e-16 ***
trip_distance    0.34013177  0.00046783  727.0482 < 2.2e-16 ***
passenger_count -0.00950108  0.00137861   -6.8918 5.512e-12 ***
cc_Yes           2.64537564  0.00379350  697.3434 < 2.2e-16 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

R-Squared: 0.4916
Root Mean Squared Error: 1.895

SQL DBI

library(DBI)
dbGetQuery(sc, "Select * from flights limit 10")