By-Group Summary with SparkR – Follow-up for A Reader Comment

A reader, e.g. Mr. Wayne Zhang, of my previous post (https://statcompute.wordpress.com/2018/09/03/playing-map-and-reduce-in-r-by-group-calculation) made a good comment that “Why not use directly either Spark or H2O to derive such computations without involving detailed map/reduce”.

Although Spark is not as flexible as R in the statistical computation (in my opinion), it does have advantages for munging large-size data sets, such as aggregating, selecting, filtering, and so on. In the demonstration below, it is shown how to do the same by-group calculation by using SparkR.

In SparkR, the most convenient way to do the by-group calculation is to use the agg() function after grouping the Spark DataFrame based on the specific column (or columns) with the groupBy() function.

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local", sparkConfig = list(spark.driver.memory = "10g", spark.driver.cores = "4"))
df <- as.DataFrame(iris)
summ1 <- agg(
  groupBy(df, alias(df$Species, "species")), 
  sl_avg = avg(df$Sepal_Length), 
  sw_avg = avg(df$Sepal_Width)
)
showDF(summ1)
+----------+-----------------+------------------+
|   species|           sl_avg|            sw_avg|
+----------+-----------------+------------------+
| virginica|6.587999999999998|2.9739999999999998|
|versicolor|            5.936|2.7700000000000005|
|    setosa|5.005999999999999| 3.428000000000001|
+----------+-----------------+------------------+

Alternatively, we can also use the gapply() function to apply an anonymous function calculating statistics to each chunk of the grouped Spark DataFrame. What’s more flexible in this approach is that we can define the schema of the output data, such as names and formats.

summ2 <- gapply(
  df, 
  df$"Species", 
  function(key, x) {
    data.frame(key, mean(x$Sepal_Length), mean(x$Sepal_Width), stringsAsFactors = F)
  }, 
  "species STRING, sl_avg DOUBLE, sw_avg DOUBLE"
)
showDF(summ2)
+----------+------+------+
|   species|sl_avg|sw_avg|
+----------+------+------+
| virginica| 6.588| 2.974|
|versicolor| 5.936|  2.77|
|    setosa| 5.006| 3.428|
+----------+------+------+

At last, we can take advantage of the Spark SQL engine after saving the DataFrame as a table.

createOrReplaceTempView(df, "tbl")
summ3 <- sql("select Species as species, avg(Sepal_Length) as sl_avg, avg(Sepal_Width) as sw_avg from tbl group by Species")
showDF(summ3)
+----------+-----------------+------------------+
|   species|           sl_avg|            sw_avg|
+----------+-----------------+------------------+
| virginica|6.587999999999998|2.9739999999999998|
|versicolor|            5.936|2.7700000000000005|
|    setosa|5.005999999999999| 3.428000000000001|
+----------+-----------------+------------------+