Yet Another Blog in Statistical Computing

I can calculate the motion of heavenly bodies but not the madness of people. -Isaac Newton

Archive for the ‘Spark’ Category

H2O Benchmark for CSV Import

The importFile() function in H2O is extremely efficient due to the parallel reading. The benchmark comparison below shows that it is comparable to the read.df() in SparkR and significantly faster than the generic read.csv().

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local", sparkConfig = list(spark.driver.memory = "10g", spark.driver.cores = "4"))

library(h2o)
h2o.init(max_mem_size = "10g")

library(rbenchmark)

benchmark(replications = 5, order = "elapsed", relative = "elapsed",
   csv = {
          df <- read.csv("Documents/nycflights13.csv")
          print(nrow(df))
          rm(df)
         },
   spk = {
          df <- read.df("Documents/nycflights13.csv", source = "csv", header = "true", inferSchema = "true")
          print(nrow(df))
          rm(df)
         },
   h2o = {
          df <- h2o.importFile(path = "Documents/nycflights13.csv", header = TRUE, sep = ",")
          print(nrow(df))
          rm(df)
         }
 )

#   test replications elapsed relative user.self sys.self user.child sys.child
# 3  h2o            5   8.221    1.000     0.508    0.032          0         0
# 2  spk            5   9.822    1.195     0.008    0.004          0         0
# 1  csv            5  16.595    2.019    16.420    0.176          0         0

Written by statcompute

June 26, 2017 at 1:24 am

Posted in Big Data, H2O, S+/R, Spark

Tagged with , , ,

Joining Tables in SparkR

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local")
df1 <- read.df("nycflights13.csv", source = "csv", header = "true", inferSchema = "true")

grp1 <- groupBy(filter(df1, "month in (1, 2, 3)"), "month")
sum1 <- withColumnRenamed(agg(grp1, min_dep = min(df1$dep_delay)), "month", "month1")

grp2 <- groupBy(filter(df1, "month in (2, 3, 4)"), "month")
sum2 <- withColumnRenamed(agg(grp2, max_dep = max(df1$dep_delay)), "month", "month2")

# INNER JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all = FALSE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "inner"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     3|    -25|     3|    911|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# LEFT JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all.x = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "left"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     1|    -30|  null|   null|
#|     3|    -25|     3|    911|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# RIGHT JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all.y = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "right"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     3|    -25|     3|    911|
#|  null|   null|     4|    960|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# FULL JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "full"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     1|    -30|  null|   null|
#|     3|    -25|     3|    911|
#|  null|   null|     4|    960|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

Written by statcompute

June 13, 2017 at 1:27 am

Posted in Big Data, S+/R, Spark

Tagged with , ,

R Interface to Spark

SparkR

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local")
df1 <- read.df("nycflights13.csv", source = "csv", header = "true", inferSchema = "true")

### SUMMARY TABLE WITH SQL
createOrReplaceTempView(df1, "tbl1")
summ <- sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month")
head(summ)
#   month  avg_dep  avg_arr
# 1     1 1347.210 1523.155
# 2     3 1359.500 1509.743
# 3     5 1351.168 1502.685

### SUMMARY TABLE WITH AGG()
grp <- groupBy(filter(df1, "month in (1, 3, 5)"), "month")
summ <- agg(grp, avg_dep = avg(df1$dep_time), avg_arr = avg(df1$arr_time))
head(summ)
#   month  avg_dep  avg_arr
# 1     1 1347.210 1523.155
# 2     3 1359.500 1509.743
# 3     5 1351.168 1502.685

sparklyr

library(sparklyr)
sc <- spark_connect(master = "local")
df1 <- spark_read_csv(sc, name = "tbl1", path = "nycflights13.csv", header = TRUE, infer_schema = TRUE)

### SUMMARY TABLE WITH SQL
library(DBI)
summ <- dbGetQuery(sc, "select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month")
head(summ)
#   month  avg_dep  avg_arr
# 1     5 1351.168 1502.685
# 2     1 1347.210 1523.155
# 3     3 1359.500 1509.743

### SUMMARY TABLE WITH DPLYR
library(dplyr)
summ <- df1 %>% 
        filter(month %in% c(1, 3, 5)) %>% 
        group_by(month) %>%
        summarize(avg_dep = mean(dep_time), avg_arr = mean(arr_time)) 
head(summ)        
#   month  avg_dep  avg_arr
#   <int>    <dbl>    <dbl>
# 1     5 1351.168 1502.685
# 2     1 1347.210 1523.155
# 3     3 1359.500 1509.743        

Written by statcompute

June 9, 2017 at 12:30 am

Posted in Big Data, S+/R, Spark

Tagged with , , ,

Data Aggregation with PySpark

Import CSV File into Spark Dataframe

import pyspark as spark

sc = spark.SQLContext(spark.SparkContext())

sdf1 = sc.read.csv("Documents/nycflights13.csv", header = True, inferSchema = True)

Data Aggregation with Spark Dataframe

import pyspark.sql.functions as fn

sdf1.cache() \
    .filter("month in (1, 3, 5)") \
    .groupby("month") \
    .agg(fn.mean("dep_time").alias("avg_dep"), fn.mean("arr_time").alias("avg_arr")) \
    .show()

+-----+------------------+------------------+
|month|           avg_dep|           avg_arr|
+-----+------------------+------------------+
|    1| 1347.209530642299|1523.1545262203415|
|    3|1359.4997676330747|1509.7429767741473|
|    5|1351.1682074168525|1502.6846604007803|
+-----+------------------+------------------+

Data Aggregation with Spark SQL

sc.registerDataFrameAsTable(sdf1, "tbl1")

sc.sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month").show()

sc.dropTempTable(sc.tableNames()[0])

+-----+------------------+------------------+
|month|           avg_dep|           avg_arr|
+-----+------------------+------------------+
|    1| 1347.209530642299|1523.1545262203415|
|    3|1359.4997676330747|1509.7429767741473|
|    5|1351.1682074168525|1502.6846604007803|
+-----+------------------+------------------+

Written by statcompute

June 1, 2017 at 1:02 am

Posted in Pyspark, PYTHON, Spark

Tagged with ,

Kick Off Spark

My first Spark section:


scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sdf = spark.read.option("header", true).csv("Documents/spark/credit_count.txt")
sdf: org.apache.spark.sql.DataFrame = [CARDHLDR: string, DEFAULT: string ... 12 more fields]

scala> sdf.printSchema()
root
 |-- CARDHLDR: string (nullable = true)
 |-- DEFAULT: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- ACADMOS: string (nullable = true)
 |-- ADEPCNT: string (nullable = true)
 |-- MAJORDRG: string (nullable = true)
 |-- MINORDRG: string (nullable = true)
 |-- OWNRENT: string (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- SELFEMPL: string (nullable = true)
 |-- INCPER: string (nullable = true)
 |-- EXP_INC: string (nullable = true)
 |-- SPENDING: string (nullable = true)
 |-- LOGSPEND : string (nullable = true)

scala> sdf.createOrReplaceTempView("tmp1")

scala> spark.sql("select count(*) as obs from tmp1").show()
+-----+
|  obs|
+-----+
|13444|
+-----+

Pyspark section doing the same thing:


In [1]: import pyspark as spark

In [2]: sc = spark.SQLContext(spark.SparkContext())

In [3]: sdf = sc.read.csv("Documents/spark/credit_count.txt", header = True)

In [4]: sdf.printSchema()
root
 |-- CARDHLDR: string (nullable = true)
 |-- DEFAULT: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- ACADMOS: string (nullable = true)
 |-- ADEPCNT: string (nullable = true)
 |-- MAJORDRG: string (nullable = true)
 |-- MINORDRG: string (nullable = true)
 |-- OWNRENT: string (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- SELFEMPL: string (nullable = true)
 |-- INCPER: string (nullable = true)
 |-- EXP_INC: string (nullable = true)
 |-- SPENDING: string (nullable = true)
 |-- LOGSPEND : string (nullable = true)

In [5]: sdf.createOrReplaceTempView("tmp1")

In [6]: sc.sql("select count(*) as obs from tmp1").show()
+-----+
|  obs|
+-----+
|13444|
+-----+

Written by statcompute

May 29, 2017 at 1:34 am

Posted in Pyspark, PYTHON, Spark

Tagged with ,