Yet Another Blog in Statistical Computing

I can calculate the motion of heavenly bodies but not the madness of people. -Isaac Newton

Archive for the ‘Big Data’ Category

Query CSV Data with Apache Drill

The most interesting feature of Apache Drill (https://drill.apache.org) is the functionality of querying the raw data in original sources, e.g. json, parquet, or even csv, directly from the file system through the so-called storage plugin. In the demonstration below, I will show how to extract the data from a csv file.

To do the test drive of Apache Drill without the installation, I downloaded the docker container from https://hub.docker.com/r/harisekhon/apache-drill and then started the container by “docker run -it harisekhon/apache-drill”.

Once in the Drill shell, we can use “show schemas;” to list storage plugins that have been configured in the system. As shown below, while there are several default storage plugins, the one that we will use is “dfs” that points to the local file system.


0: jdbc:drill:zk=local> show schemas;
+---------------------+
|     SCHEMA_NAME     |
+---------------------+
| INFORMATION_SCHEMA  |
| cp.default          |
| dfs.default         |
| dfs.root            |
| dfs.tmp             |
| sys                 |
+---------------------+

We can explore file formats supported in the dfs local file system by the command “describe schema dfs;”. As shown below, in the pre-configured storage plugin, 2 entries are specifically related to the csv data format that will be used later. The entry “csv” supports data files without headers and the entry “csvh” supports data files with headers.


    "csv" : {
      "type" : "text",
      "extensions" : [ "csv" ],
      "delimiter" : ","
    },
    "csvh" : {
      "type" : "text",
      "extensions" : [ "csvh" ],
      "extractHeader" : true,
      "delimiter" : ","
    }

Next, we can use the command “show files in dfs;” to explore the file system. For instance, running the command “show files in dfs.`apache-drill/sample-data`;” will list all files in the folder “sample-data”, which includes a csv file named “nycflights.csv” that was copied from the host by “docker cp nycflights.csv 1fc4a0087a3e:/apache-drill/sample-data”, where “1fc4a0087a3e” is the container_id.

From the initial output below, it appears that there are headers in this csv file.


0: jdbc:drill:zk=local> select columns[0] as x1, columns[1] as x2 from dfs.`apache-drill/sample-data/nycflights.csv` limit 5;
+-------+--------+
|  x1   |   x2   |
+-------+--------+
| year  | month  |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
+-------+--------+

As a result, we need to change the file extension from “csv” to “csvh” in the file system and then query the data again. It turns out successful this time.


0: jdbc:drill:zk=local> select `year`, `month` from dfs.`apache-drill/sample-data/nycflights.csvh` limit 5;
+-------+--------+
| year  | month  |
+-------+--------+
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
+-------+--------+

From here, we can explore the data in the raw csv file directly. For instancce, we can aggregate the number of flights by airports.


0: jdbc:drill:zk=local> select origin, count(*) as cnts from dfs.`apache-drill/sample-data/nycflights.csvh` group by origin;
+---------+---------+
| origin  |  cnts   |
+---------+---------+
| JFK     | 111279  |
| EWR     | 120835  |
| LGA     | 104662  |
+---------+---------+

Or we can extract all flights from the airport LGA and then export the data to a JSON file for the future analyses.


0: jdbc:drill:zk=local> use dfs.tmp;
+-------+--------------------------------------+
|  ok   |               summary                |
+-------+--------------------------------------+
| true  | Default schema changed to [dfs.tmp]  |
+-------+--------------------------------------+

0: jdbc:drill:zk=local> alter session set `store.format` = 'json';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+

0: jdbc:drill:zk=local> create table dfs.tmp.lga as select * from dfs.`apache-drill/sample-data/nycflights.csvh` where origin = 'LGA';
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 104662                     |
+-----------+----------------------------+

0: jdbc:drill:zk=local> show files in dfs.tmp.`lga`;
+-------------+--------------+---------+-----------+--------+--------+
|    name     | isDirectory  | isFile  |  length   | owner  | group  |       
+-------------+--------------+---------+-----------+--------+--------+
| 0_0_0.json  | false        | true    | 34156554  | root   | root   | 
+-------------+--------------+---------+-----------+--------+--------+

Advertisements

Written by statcompute

December 17, 2017 at 6:01 pm

Posted in Apache, Big Data

Tagged with

Sparkling Water and Moving Data Around

Sparkling Water is an application to integrate H2O with Spark. Below is an example showing how to move the data around among Pandas DataFrame, H2OFrame, and Spark Dataframe.

1. Define Context

In [1]: from pandas import read_csv, DataFrame

In [2]: from pyspark import sql

In [3]: from pysparkling import H2OContext

In [4]: from h2o import import_file, H2OFrame

In [5]: ss = sql.SparkSession.builder.getOrCreate()

In [6]: hc = H2OContext.getOrCreate(ss)

2. Convert Pandas Dataframe to H2OFrame and Spark DataFrame

In [7]: p_df = read_csv("Documents/credit_count.txt")

In [8]: type(p_df)
Out[8]: pandas.core.frame.DataFrame

In [9]: p2s_df = ss.createDataFrame(p_df)

In [10]: type(p2s_df)
Out[10]: pyspark.sql.dataframe.DataFrame

In [11]: p2h_df = H2OFrame(p_df)

In [12]: type(p2h_df)
Out[12]: h2o.frame.H2OFrame

3. Convert Spark Dataframe to H2OFrame and Pandas DataFrame

In [13]: s_df = ss.read.csv("Documents/credit_count.txt", header = True, inferSchema = True)

In [14]: type(s_df)
Out[14]: pyspark.sql.dataframe.DataFrame

In [15]: s2p_df = s_df.toPandas()

In [16]: type(s2p_df)
Out[16]: pandas.core.frame.DataFrame

In [17]: s2h_df = hc.as_h2o_frame(s_df)

In [18]: type(s2h_df)
Out[18]: h2o.frame.H2OFrame

4. Convert H2OFrame to Pandas Dataframe and Spark DataFrame

In [19]: h_df = import_file("Documents/credit_count.txt", header = 1, sep = ",")

In [20]: type(h_df)
Out[20]: h2o.frame.H2OFrame

In [21]: h2p_df = h_df.as_data_frame()

In [22]: type(h2p_df)
Out[22]: pandas.core.frame.DataFrame

In [23]: h2s_df = hc.as_spark_frame(h_df)

In [24]: type(h2s_df)
Out[24]: pyspark.sql.dataframe.DataFrame

Written by statcompute

July 3, 2017 at 12:36 am

GLM with H2O in R

Below is an example showing how to fit a Generalized Linear Model with H2O in R. The output is much more comprehensive than the one generated by the generic R glm().

> library(h2o)

> h2o.init(max_mem_size = "12g")

> df1 <- h2o.uploadFile("Documents/credit_count.txt", header = TRUE, sep = ",", parse_type = "CSV")

> df2 <- h2o.assign(df1[df1$CARDHLDR == 1, ], "glm_df")

> h2o.colnames(df2)
 [1] "CARDHLDR" "DEFAULT"  "AGE"      "ACADMOS"  "ADEPCNT"  "MAJORDRG"
 [7] "MINORDRG" "OWNRENT"  "INCOME"   "SELFEMPL" "INCPER"   "EXP_INC"
[13] "SPENDING" "LOGSPEND"

> Y <- "DEFAULT"

> X <- c("MAJORDRG", "MINORDRG", "INCOME", "OWNRENT")

> dist <- "binomial"

> link <- "logit"

> id <- "h2o_mdl01"

> mdl <- h2o.glm(X, Y, training_frame = h2o.getFrame("glm_df"), model_id = id, family = dist, link = link, lambda = 0, compute_p_values = TRUE, standardize = FALSE)

> show(h2o.getModel(id)@model$coefficients_table)
Coefficients: glm coefficients
      names coefficients std_error    z_value  p_value
1 Intercept    -1.204439  0.090811 -13.263121 0.000000
2  MAJORDRG     0.203135  0.069250   2.933370 0.003353
3  MINORDRG     0.202727  0.047971   4.226014 0.000024
4   OWNRENT    -0.201223  0.071619  -2.809636 0.004960
5    INCOME    -0.000442  0.000040 -10.942350 0.000000

> h2o.performance(h2o.getModel(id))
H2OBinomialMetrics: glm
** Reported on training data. **

MSE:  0.08414496
RMSE:  0.2900775
LogLoss:  0.3036585
Mean Per-Class Error:  0.410972
AUC:  0.6432189
Gini:  0.2864378
R^2:  0.02005004
Residual Deviance:  6376.221
AIC:  6386.221

Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
          0    1    Error         Rate
0      7703 1800 0.189414   =1800/9503
1       630  366 0.632530     =630/996
Totals 8333 2166 0.231451  =2430/10499

Maximum Metrics: Maximum metrics at their respective thresholds
                        metric threshold    value idx
1                       max f1  0.126755 0.231499 142
2                       max f2  0.075073 0.376556 272
3                 max f0point5  0.138125 0.191828 115
4                 max accuracy  0.368431 0.905039   0
5                max precision  0.314224 0.250000   3
6                   max recall  0.006115 1.000000 399
7              max specificity  0.368431 0.999895   0
8             max absolute_mcc  0.126755 0.128940 142
9   max min_per_class_accuracy  0.106204 0.604546 196
10 max mean_per_class_accuracy  0.103730 0.605663 202

Written by statcompute

June 28, 2017 at 12:25 am

H2O Benchmark for CSV Import

The importFile() function in H2O is extremely efficient due to the parallel reading. The benchmark comparison below shows that it is comparable to the read.df() in SparkR and significantly faster than the generic read.csv().

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local", sparkConfig = list(spark.driver.memory = "10g", spark.driver.cores = "4"))

library(h2o)
h2o.init(max_mem_size = "10g")

library(rbenchmark)

benchmark(replications = 5, order = "elapsed", relative = "elapsed",
   csv = {
          df <- read.csv("Documents/nycflights13.csv")
          print(nrow(df))
          rm(df)
         },
   spk = {
          df <- read.df("Documents/nycflights13.csv", source = "csv", header = "true", inferSchema = "true")
          print(nrow(df))
          rm(df)
         },
   h2o = {
          df <- h2o.importFile(path = "Documents/nycflights13.csv", header = TRUE, sep = ",")
          print(nrow(df))
          rm(df)
         }
 )

#   test replications elapsed relative user.self sys.self user.child sys.child
# 3  h2o            5   8.221    1.000     0.508    0.032          0         0
# 2  spk            5   9.822    1.195     0.008    0.004          0         0
# 1  csv            5  16.595    2.019    16.420    0.176          0         0

Written by statcompute

June 26, 2017 at 1:24 am

Posted in Big Data, H2O, S+/R, Spark

Tagged with , , ,

Joining Tables in SparkR

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local")
df1 <- read.df("nycflights13.csv", source = "csv", header = "true", inferSchema = "true")

grp1 <- groupBy(filter(df1, "month in (1, 2, 3)"), "month")
sum1 <- withColumnRenamed(agg(grp1, min_dep = min(df1$dep_delay)), "month", "month1")

grp2 <- groupBy(filter(df1, "month in (2, 3, 4)"), "month")
sum2 <- withColumnRenamed(agg(grp2, max_dep = max(df1$dep_delay)), "month", "month2")

# INNER JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all = FALSE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "inner"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     3|    -25|     3|    911|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# LEFT JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all.x = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "left"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     1|    -30|  null|   null|
#|     3|    -25|     3|    911|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# RIGHT JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all.y = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "right"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     3|    -25|     3|    911|
#|  null|   null|     4|    960|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# FULL JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "full"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     1|    -30|  null|   null|
#|     3|    -25|     3|    911|
#|  null|   null|     4|    960|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

Written by statcompute

June 13, 2017 at 1:27 am

Posted in Big Data, S+/R, Spark

Tagged with , ,

R Interface to Spark

SparkR

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local")
df1 <- read.df("nycflights13.csv", source = "csv", header = "true", inferSchema = "true")

### SUMMARY TABLE WITH SQL
createOrReplaceTempView(df1, "tbl1")
summ <- sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month")
head(summ)
#   month  avg_dep  avg_arr
# 1     1 1347.210 1523.155
# 2     3 1359.500 1509.743
# 3     5 1351.168 1502.685

### SUMMARY TABLE WITH AGG()
grp <- groupBy(filter(df1, "month in (1, 3, 5)"), "month")
summ <- agg(grp, avg_dep = avg(df1$dep_time), avg_arr = avg(df1$arr_time))
head(summ)
#   month  avg_dep  avg_arr
# 1     1 1347.210 1523.155
# 2     3 1359.500 1509.743
# 3     5 1351.168 1502.685

sparklyr

library(sparklyr)
sc <- spark_connect(master = "local")
df1 <- spark_read_csv(sc, name = "tbl1", path = "nycflights13.csv", header = TRUE, infer_schema = TRUE)

### SUMMARY TABLE WITH SQL
library(DBI)
summ <- dbGetQuery(sc, "select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month")
head(summ)
#   month  avg_dep  avg_arr
# 1     5 1351.168 1502.685
# 2     1 1347.210 1523.155
# 3     3 1359.500 1509.743

### SUMMARY TABLE WITH DPLYR
library(dplyr)
summ <- df1 %>% 
        filter(month %in% c(1, 3, 5)) %>% 
        group_by(month) %>%
        summarize(avg_dep = mean(dep_time), avg_arr = mean(arr_time)) 
head(summ)        
#   month  avg_dep  avg_arr
#   <int>    <dbl>    <dbl>
# 1     5 1351.168 1502.685
# 2     1 1347.210 1523.155
# 3     3 1359.500 1509.743        

Written by statcompute

June 9, 2017 at 12:30 am

Posted in Big Data, S+/R, Spark

Tagged with , , ,

Random Search for Optimal Parameters

Practices of manual search, grid search, or the combination of both have been successfully employed in the machine learning to optimize hyper-parameters. However, in the arena of deep learning, both approaches might become impractical. For instance, the computing cost of grid search for hyper-parameters in a multi-layer deep neural network (DNN) could be prohibitively high.

In light of aforementioned hurdles, Bergstra and Bengio proposed a novel idea of random search in the paper http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf. In their study, it was found that random search is more efficient than grid search for the hyper-parameter optimization in terms of computing costs.

In the example below, it is shown that both grid search and random search have reached similar results in the SVM parameter optimization based on cross-validations.

import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV 
from sklearn.svm import SVC as svc 
from sklearn.metrics import make_scorer, roc_auc_score
from scipy import stats

# DATA PREPARATION
df = pd.read_csv("credit_count.txt")
y = df[df.CARDHLDR == 1].DEFAULT.values 
x = preprocessing.scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0) 

# DEFINE MODEL AND PERFORMANCE MEASURE
mdl = svc(probability = True, random_state = 1)
auc = make_scorer(roc_auc_score)

# GRID SEARCH FOR 20 COMBINATIONS OF PARAMETERS
grid_list = {"C": np.arange(2, 10, 2),
             "gamma": np.arange(0.1, 1, 0.2)}

grid_search = GridSearchCV(mdl, param_grid = grid_list, n_jobs = 4, cv = 3, scoring = auc) 
grid_search.fit(x, y) 
grid_search.cv_results_

# RANDOM SEARCH FOR 20 COMBINATIONS OF PARAMETERS
rand_list = {"C": stats.uniform(2, 10),
             "gamma": stats.uniform(0.1, 1)}
             
rand_search = RandomizedSearchCV(mdl, param_distributions = rand_list, n_iter = 20, n_jobs = 4, cv = 3, random_state = 2017, scoring = auc) 
rand_search.fit(x, y) 
rand_search.cv_results_

Written by statcompute

April 10, 2017 at 12:07 am