Partial Dependence Plot (PDP) of GRNN

The function grnn.margin() (https://github.com/statcompute/yager/blob/master/code/grnn.margin.R) was my first attempt to explore the relationship between each predictor and the response in a General Regression Neural Network, which usually is considered the Black-Box model. The idea is described below:

  1. First trained a GRNN with the original training dataset
  2. Created an artificial dataset from the training data by keeping distinct values of the variable that we are interested in but replacing all values of other variables with their means. For instance, given a dataset with three variables X1, X2, and X3, if we are interested in the marginal effect of X1 with 3 distinct values, e.g. [X11 X12 X13], then the constructed dataset should look like {[X11 mean(X2) mean(X3)], [X12 mean(X2) mean(X3)], [X13 mean(X2) mean(X3)]}
  3. Calculated predicted values, namely [Pred1 Pred2 Pred3], based on the constructed dataset by using the GRNN created in the first step
  4. At last, the relationship between [X11 X12 X13] and [Pred1 Pred2 Pred3] is what we are looking for

The above-mentioned approach is computationally efficient but might be somewhat “brutal” in a sense that it doesn’t consider the variation in other variables.

By the end of Friday, my boss pointed me to a paper describing the partial dependence plot (Yes! In 53, we also have SVP who is technically savvy). The idea is very intriguing, albeit computationally expensive, and is delineated as below:

  1. First trained a GRNN with the original training dataset
  2. Based on the training dataset, get a list of distinct values from the variable of interest, e.g. [X11 X12 X13]. In this particular example, we created three separate datasets from the training data by keeping the other variables as they are but replacing all values of X1 with each of [X11 X12 X13] respectively
  3. With each of three constructed datasets above, calculated predicted values and then averaged them out such that we would have an average of predicted values for each of [X11 X12 X13], namely [Pavg1 Pavg2 Pavg3]
  4. The relationship between [X11 X12 X13] and [Pavg1 Pavg2 Pavg3] is the so-called Partial Dependence

The idea of PDP has been embedded in the YAGeR project (https://github.com/statcompute/yager/blob/master/code/grnn.partial.R). In the chart below, I compared outcomes of grnn.partial() and grnn.margin() side by side for two variables, e.g. the first not so predictive and the second very predictive. In this particular comparison, both appeared almost identical.

dpd

Improve GRNN Efficiency by Weighting

In the post (https://statcompute.wordpress.com/2019/07/14/yet-another-r-package-for-general-regression-neural-network), several advantages of General Regression Neural Network (GRNN) have been discussed. However, as pointed out by Specht, a major weakness of GRNN is the high computational cost required for a GRNN to generate predicted values based on a new input matrix due to its unique network structure, e.g. the number of neurons equal to the number of training samples.

For practical purposes, there is however no need to assign a neuron to each training sample, given the data duplication in real-world model development samples. Instead, a weighting scheme can be employed to reflect the frequency count of each unique training sample. A major benefit of the weight assignment is the ability to improve the efficiency of calculating predicted values, which depends on the extent of data duplicates. More attractively, the weighting application can bring up the possibility of using clustering or binning techniques to preprocess the training data so as to overcome the aforementioned weakness to a large degree.

Below is a demonstration showing the efficiency gain by using the weighting scheme in GRNN.

  1. First of all, I constructed a sample data with duplicates to double the size of the original Boston dataset. Based on the constructed data, a GRNN named “N1” was trained.
  2. Secondly, I generated another sample data by aggregating the above constructed data based on unique samples and calculating the weight of each unique data point based on its frequency. Based on the aggregated data, another GRNN named “N2” was also trained.

As shown in the output, predicted vectors from both “N1” and “N2” are identical. However, the computing time can be reduced to half by applying the weighting. All R functions used in the example can be found in https://github.com/statcompute/GRnnet/blob/master/code/grnnet.R.

For people interested in the SAS implementation of GRNN, two SAS macros are also available in https://github.com/statcompute/GRnnet/blob/master/code/grnn_learn.SAS and https://github.com/statcompute/GRnnet/blob/master/code/grnn_pred.SAS.

data(Boston, package = "MASS")
### CONSTRUCT THE UNWEIGHTED DATA.FRAME WITH DUPLICATES
df1 <- rbind(Boston[rep(seq(100), 5), ], Boston)
nrow(df1)
# 1006
X1 <- scale(df1[, 1:13])
Y1 <- df1[, 14]
N1 <- grnn.fit(X1, Y1)
### CONSTRUCT THE WEIGHTED DATA.FRAME WITHOUT DUPLICATES
XY <- data.frame(X1, Y1)
df2 <- Reduce(rbind, lapply(split(XY, XY[, colnames(XY)], drop = T),
function(x_) data.frame(x_[1, ], cnt = nrow(x_))))
nrow(df2)
# 506
sum(df2$cnt)
# 1006
X2 <- as.matrix(df2[, 1:13])
Y2 <- df2[, 14]
W2 <- df2[, 15]
N2 <- grnn.fit(X2, Y2, W2)
### IDENTICAL PREDICTED VALUES WITH UNWEIGHTED AND WEIGHTED DATA.FRAMES
grnn.predone(N1, X1[1, ])
# 24.69219
grnn.predone(N2, X1[1, ])
# 24.69219
all.equal(grnn.predict(N1, X1[1:100, ]), grnn.predict(N2, X1[1:100, ]))
# TRUE
### COMPUTING TIME ROUGHLY LINEAR WITH RESPECT TO SIZE OF UNIQUE TRAINING SAMPLE
rbenchmark::benchmark(replications = 10, order = "elapsed", relative = "elapsed",
columns = c("test", "replications", "elapsed", "relative"),
" NO WEIGHT" = grnn.predict(N1, X1[1:100, ]),
"USE WEIGHT" = grnn.predict(N2, X1[1:100, ])
)
# test replications elapsed relative
# 2 USE WEIGHT 10 2.157 1.000
# 1 NO WEIGHT 10 5.506 2.553

view raw
wt_grnn.R
hosted with ❤ by GitHub

Yet Another R Package for General Regression Neural Network

Compared with other types of neural networks, General Regression Neural Network (Specht, 1991) is advantageous in several aspects.

  1. Being an universal approximation function, GRNN has only one tuning parameter to control the overall generalization
  2. The network structure of GRNN is surprisingly simple, with only one hidden layer and the number of neurons equal to the number of training samples.
  3. GRNN is always able to converge globally and won’t be trapped by local solutions.
  4. The training of GRNN is a simple 1-pass, regardless of the sample size, and doesn’t require time-consuming iterations.
  5. Since any projected value of GRNN is the weighted average of training samples, predictions are bounded by the observed range.

The grnn package (https://cran.r-project.org/web/packages/grnn/index.html), which has not been updated since 2013, is the only implementation of GRNN on CRAN and was designed elegantly with a parsimonious set of functions and lots of opportunities for potential improvements.

The YAGeR project (https://github.com/statcompute/yager) is my attempt to provide a R implementation of GRNN, with several enhancements.

  1. While the training function grnn.fit() is very similar to learn() and smooth() in the grnn package. three functions were designed to provide GRNN projections. The grnn.predone() function generates one projected value based on an input vector. Both grnn.predict() and grnn.parpred() functions generate a vector of projected values based on an input matrix. The only difference is that grnn.parpred() runs in parallel and therefore can be 3 times faster than grnn.predict() on my 4-core workstation.
  2. While tuning the only hyper-parameter is the key in GRNN training, there are two functions in the GRnnet project to search for the optimal parameter through the n-fold cross validation, including grnn.cv_r2() for numeric outcomes and grnn.cv_auc() for binary outcomes.
  3. In grnn.predone() function, while the default projection is based on the Euclidean distance, there is an option to calculate the GRNN projection based on the Manhattan distance as well for the sake of computational simplicity (Specht, 1991).

In the banking industry, GRNN can be useful in several areas. First of all, it can be employed as the replacement of splines to approximate the term structure of interest rates. Secondly, like other neural networks, it can be used in Fraud Detection and Anti-Money Laundering given its flexibility. At last, in the credit risk modeling, it can also be used to develop performance benchmarks and rapid prototypes for scorecards or Expected Loss models due to the simplicity.

Chunk Averaging of GLM

Chunk Average (CA) is an interesting concept proposed by Matloff in the chapter 13 of his book “Parallel Computing for Data Science”. The basic idea is to partition the entire model estimation sample into chunks and then to estimate a glm for each chunk. Under the i.i.d assumption, the CA estimator with the chunked data is asymptotically equivalent to the estimator with the full data. The possibility of converting the full model estimation with an excessively large dataset to the chunked estimation with small pieces is particularly attractive in real-world model developments where the model convergence could be challenging given the data size.

The ca_glm() function below is my attempt to implement the Chunk Averaging of GLM. As shown, CA estimations by various chunks are consistent with the estimation with the full data.

df1 <- read.csv("/mnt/d/projects/data/credit_count.txt")
df2 <- df1[which(df1$CARDHLDR == 1), ]
ca_glm <- function(fml, data, family, nchunk) {
cls <- parallel::makeCluster(nchunk, type = "PSOCK")
df1 <- parallel::parLapplyLB(cls, parallel::clusterSplit(cls, seq(nrow(data))),
function(c_) data[c_,])
parallel::clusterExport(cls, c("fml", "family", "data"), envir = environment())
est <- parallel::parLapplyLB(cls, df1,
function(d_) cbind(coef(summary(glm(fml, data = d_, family = family)))[, 1:2], nrow(d_) / nrow(data)))
parallel::stopCluster(cls)
df2 <- Reduce(rbind,
lapply(est,
function(e_) data.frame(name = format(rownames(e_), justify = "left"),
beta = e_[, 1] * e_[, 3],
var = (e_[, 2] * e_[, 3])^ 2)))
df3 <- Reduce(rbind,
lapply(split(df2, df2$name),
function(d_) data.frame(name = d_$name[1],
beta = sum(d_$beta),
stder = sum(d_$var) ^ 0.5)))
return(cbind(df3, zvalue = df3$beta / df3$stder, pvalue = 2 * pnorm(abs(df3$beta / df3$stder))))
}
y <- "DEFAULT"
x <- c("MAJORDRG", "MINORDRG", "INCOME")
f <- as.formula(paste(y, paste(x, collapse = " + "), sep = " ~ "))
summary(glm(f, data = df2, family = "binomial"))$coef
# Estimate Std. Error z value Pr(|z|)
# (Intercept) -1.2215970658 9.076358e-02 -13.459110 2.721743e-41
# MAJORDRG 0.2030503715 6.921101e-02 2.933787 3.348538e-03
# MINORDRG 0.1919770456 4.783751e-02 4.013107 5.992472e-05
# INCOME -0.0004705599 3.918955e-05 -12.007282 3.253645e-33
ca_glm(f, df2, "binomial", 2)[rank(rownames(summary(glm(f, data = df2, family = "binomial"))$coef)), ]
# name beta stder zvalue pvalue
# (Intercept) -1.2001768403 9.161584e-02 -13.100102 3.288167e-39
# MAJORDRG 0.2024462446 6.936634e-02 2.918508 3.517103e-03
# MINORDRG 0.1928945270 4.799079e-02 4.019407 5.834476e-05
# INCOME -0.0004811946 3.974214e-05 -12.107919 9.589651e-34
ca_glm(f, df2, "binomial", 4)[rank(rownames(summary(glm(f, data = df2, family = "binomial"))$coef)), ]
# name beta stder zvalue pvalue
# (Intercept) -1.1891569565 9.257056e-02 -12.845952 9.063064e-38
# MAJORDRG 0.2008495039 7.084338e-02 2.835120 4.580846e-03
# MINORDRG 0.2075713235 4.883860e-02 4.250149 2.136283e-05
# INCOME -0.0004902169 4.032866e-05 -12.155548 5.360122e-34
y <- "MAJORDRG"
x <- c("ADEPCNT", "MINORDRG", "INCPER")
f <- as.formula(paste(y, paste(x, collapse = " + "), sep = " ~ "))
summary(glm(f, data = df2, family = "poisson"))$coef
# Estimate Std. Error z value Pr(|z|)
# (Intercept) -2.875143e+00 6.565395e-02 -43.792384 0.000000e+00
# ADEPCNT 2.091082e-01 2.137937e-02 9.780844 1.360717e-22
# MINORDRG 5.249111e-01 1.775197e-02 29.569171 3.723779e-192
# INCPER 2.018335e-05 1.683187e-06 11.991151 3.953735e-33
ca_glm(f, df2, "poisson", 2)[rank(rownames(summary(glm(f, data = df2, family = "poisson"))$coef)), ]
# name beta stder zvalue pvalue
# (Intercept) -2.876932e+00 6.589413e-02 -43.659914 0.000000e+00
# ADEPCNT 2.072821e-01 2.151670e-02 9.633546 5.770316e-22
# MINORDRG 5.269996e-01 1.791464e-02 29.417248 3.304900e-190
# INCPER 2.015435e-05 1.692556e-06 11.907644 1.079855e-32
ca_glm(f, df2, "poisson", 4)[rank(rownames(summary(glm(f, data = df2, family = "poisson"))$coef)), ]
# name beta stder zvalue pvalue
# (Intercept) -2.890965e+00 6.723771e-02 -42.996187 0.000000e+00
# ADEPCNT 2.112105e-01 2.179890e-02 9.689044 3.356557e-22
# MINORDRG 5.334541e-01 1.848846e-02 28.853359 4.598288e-183
# INCPER 2.012836e-05 1.744654e-06 11.537165 8.570364e-31

view raw
ca_glm.R
hosted with ❤ by GitHub

Faster Way to Slice Dataframe by Row

When we’d like to slice a dataframe by row, we can employ the split() function or the iter() function in the iterators package.

By leveraging the power of parallelism, I wrote an utility function slice() to faster slice the dataframe. In the example shown below, the slice() is 3 times more efficient than the split() or the iter() to select 2 records out of 5,960 rows.


df <- read.csv("hmeq.csv")

nrow(df)
# [1] 5960

slice <- function(df) {
  return(parallel::mcMap(function(i) df[i, ], seq(nrow(df)), mc.cores = parallel::detectCores()))
}

Reduce(rbind, Filter(function(x) x$DEROG == 10, slice(df)))
#     BAD  LOAN MORTDUE VALUE  REASON   JOB YOJ DEROG DELINQ     CLAGE NINQ CLNO  DEBTINC
#3094   1 16800   16204 27781 HomeImp Other   1    10      0 190.57710    0    9 27.14689
#3280   1 17500   76100 98500 DebtCon Other   5    10      1  59.83333    5   16       NA

rbenchmark::benchmark(replications = 10, order = "elapsed", relative = "elapsed",
                        columns = c("test", "replications", "elapsed", "relative"),
  "SPLIT" = Reduce(rbind, Filter(Negate(function(x) x$DEROG != 10), split(df, seq(nrow(df))))),
  "ITER " = Reduce(rbind, Filter(Negate(function(x) x$DEROG != 10), as.list(iterators::iter(df, by = "row")))),
  "SLICE" = Reduce(rbind, Filter(Negate(function(x) x$DEROG != 10), slice(df)))
)
#  test replications elapsed relative
# SLICE           10   2.224    1.000
# SPLIT           10   7.185    3.231
# ITER            10   7.375    3.316

Direct Optimization of Hyper-Parameter

In the previous post (https://statcompute.wordpress.com/2019/02/03/sobol-sequence-vs-uniform-random-in-hyper-parameter-optimization), it is shown how to identify the optimal hyper-parameter in a General Regression Neural Network by using the Sobol sequence and the uniform random generator respectively through the N-fold cross validation. While the Sobol sequence yields a slightly better performance, outcomes from both approaches are very similar, as shown below based upon five trials with 20 samples in each. Both approaches can be generalized from one-dimensional to multi-dimensional domains, e.g. boosting or deep learning.

net <- grnn.fit(scale(Boston[, -14]), Boston[, 14], sigma = 1)
                        
sb_out <- Reduce(rbind, Map(function(x) grnn.cv(net, gen_sobol(0.1, 1.0, 20, x), 4, 2019), seq(1, 5)))

uf_out <- Reduce(rbind, Map(function(x) grnn.cv(net, gen_unifm(0.1, 1.0, 20, x), 4, 2019), seq(1, 5)))

Map(function(x) x[x$R2 == max(x$R2), ], list(sobol = sb_out, uniform = uf_out))
# $sobol
#  sigma        R2
# 0.5568 0.8019342
# $uniform
#  sigma        R2
# 0.5608 0.8019327

Other than the random search, another way to locate the optimal hyper-parameter is applying general optimization routines, As shown in the demonstration below, we first need to define an objective function, e.g. grnn.optim(), to maximize the Cross-Validation R^2. In addition, depending on the optimization algorithm, upper and lower bounds of the parameter to be optimized should also be provided. Three optimization algorithms are employed in the example, including unconstrained non-linear optimization, particle swarm optimization, and Nelder–Mead simplex optimization, with all showing comparable outcomes to ones achieved by the random search.

net <- grnn.fit(scale(Boston[, 14]), Boston[, 14], sigma = 1)
grnn.optim <- function(sigma, nn, nfolds, seed) {
dt <- nn$set
set.seed(seed)
folds <- caret::createFolds(1:nrow(dt), k = nfolds, list = FALSE)
r <- do.call(rbind,
lapply(1:nfolds,
function(i) data.frame(Ya = nn$Ya[folds == i],
Yp = grnn.predict(grnn.fit(nn$Xa[folds != i, ], nn$Ya[folds != i], sigma),
data.frame(nn$Xa[folds == i,])))))
return(r2(r$Ya, r$Yp))
}
### General-Purpose Unconstrained Non-Linear Optimization ###
op_out <- ucminf::ucminf(par = 0.5, fn = function(x) grnn.optim(x, net, 4, 2019))
# $par
# [1] 0.5611872
# $value
# [1] -0.8019319
### Particle Swarm Optimization ###
set.seed(1)
ps_out <- pso::psoptim(par = 0.5, upper = 1.0, lower = 0.1,
fn = function(x) grnn.optim(x, net, 4, 2019),
control = list(maxit = 20))
# $par
# [1] 0.5583358
# $value
# [1] -0.8019351
### Nelder–Mead Optimization ###
nm_out <- optim(par = 0.5, fn = function(x) grnn.optim(x, net, 4, 2019),
method = "Nelder-Mead", control = list(warn.1d.NelderMead = FALSE))
# $par
# [1] 0.5582031
# $value
# [1] -0.8019351

view raw
grnn_optim.R
hosted with ❤ by GitHub

Sobol Sequence vs. Uniform Random in Hyper-Parameter Optimization

Tuning hyper-parameters might be the most tedious yet crucial in various machine learning algorithms, such as neural networks, svm, or boosting. The configuration of hyper-parameters not only impacts the computational efficiency of a learning algorithm but also determines its prediction accuracy.

Thus far, manual tuning and grid searching are still the most prevailing strategies. In the paper http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf, Bergstra and Bengio showed that the random search is more efficient in the hyper-parameter optimization than both the grid search and the manual tuning. Following the similar logic of the random search, a Sobol sequence is a series of quasi-random numbers designed to cover the space more evenly than uniform random numbers.

The demonstration below compared the Sobol sequence and the uniform random number generator in the hyper-parameter tuning of a General Regression Neural Network (GRNN). In this particular example, the Sobol sequence outperforms the uniform random number generator in two folds. First of all, it picks the hyper-parameter that yields a better performance, e.g. R^2, in the cross-validation. Secondly, the performance is more consistent in multiple trials with a lower variance.

data(Boston, package = "MASS")
grnn.fit <- function(x, y, sigma) {
return(grnn::smooth(grnn::learn(data.frame(y, x)), sigma))
}
grnn.predict <- function(nn, x) {
c <- parallel::detectCores() 1
return(do.call(rbind,
parallel::mcMap(function(i) grnn::guess(nn, as.matrix(x[i, ])),
1:nrow(x), mc.cores = c))[,1])
}
r2 <- function(act, pre) {
rss <- sum((pre act) ^ 2)
tss <- sum((act mean(act)) ^ 2)
return(1 rss / tss)
}
grnn.cv <- function(nn, sigmas, nfolds, seed) {
dt <- nn$set
set.seed(seed)
folds <- caret::createFolds(1:nrow(dt), k = nfolds, list = FALSE)
cv <- function(s) {
r <- do.call(rbind,
lapply(1:nfolds,
function(i) data.frame(Ya = nn$Ya[folds == i],
Yp = grnn.predict(grnn.fit(nn$Xa[folds != i, ], nn$Ya[folds != i], s),
data.frame(nn$Xa[folds == i,])))))
return(data.frame(sigma = s, R2 = r2(r$Ya, r$Yp)))
}
r2_lst <- Reduce(rbind, Map(cv, sigmas))
return(r2_lst[r2_lst$R2 == max(r2_lst$R2), ])
}
gen_sobol <- function(min, max, n, seed) {
return(round(min + (max min) * randtoolbox::sobol(n, dim = 1, scrambling = 1, seed = seed), 4))
}
gen_unifm <- function(min, max, n, seed) {
set.seed(seed)
return(round(min + (max min) * runif(n), 4))
}
net <- grnn.fit(Boston[, 14], Boston[, 14], sigma = 2)
sobol_out <- Reduce(rbind, Map(function(x) grnn.cv(net, gen_sobol(5, 10, 10, x), 4, 2019), seq(1, 10)))
unifm_out <- Reduce(rbind, Map(function(x) grnn.cv(net, gen_unifm(5, 10, 10, x), 4, 2019), seq(1, 10)))
out <- rbind(cbind(type = rep("sobol", 10), sobol_out),
cbind(type = rep("unifm", 10), unifm_out))
boxplot(R2 ~ type, data = out, main = "Sobol Sequence vs. Uniform Random",
ylab = "CV RSquare", xlab = "Sequence Type")

view raw
sobol_grnn.R
hosted with ❤ by GitHub

Screenshot from 2019-02-03 19-50-42

More Robust Monotonic Binning Based on Isotonic Regression

Since publishing the monotonic binning function based upon the isotonic regression (https://statcompute.wordpress.com/2017/06/15/finer-monotonic-binning-based-on-isotonic-regression), I’ve received some feedback from peers. A potential concern is that, albeit improving the granularity and predictability, the binning is too fine and might not generalize well in the new data.

In light of the concern, I revised the function by imposing two thresholds, including a minimum sample size and a minimum number of bads for each bin. Both thresholds can be adjusted based on the specific use case. For instance, I set the minimum sample size equal to 50 and the minimum number of bads (and goods) equal to 10 in the example below.

isoreg_bin <- function(data, y, x) {
n1 <- 50
n2 <- 10
yname <- deparse(substitute(y))
xname <- deparse(substitute(x))
df1 <- data[, c(yname, xname)]
df2 <- df1[!is.na(df1[, xname]), c(xname, yname)]
cor <- cor(df2[, 2], df2[, 1], method = "spearman", use = "complete.obs")
reg <- isoreg(df2[, 1], cor / abs(cor) * df2[, 2])
cut <- knots(as.stepfun(reg))
df2$cut <- cut(df2[[xname]], breaks = unique(cut), include.lowest = T)
df3 <- Reduce(rbind,
lapply(split(df2, df2$cut),
function(x) data.frame(n = nrow(x),
b = sum(x[[yname]]),
g = sum(1 x[[yname]]),
maxx = max(x[[xname]]),
minx = min(x[[xname]]))))
df4 <- df3[which(df3[["n"]] > n1 & df3[["b"]] > n2 & df3[["g"]] > n2), ]
df1$good <- 1 df1[[yname]]
return(smbinning::smbinning.custom(df1, "good", xname, cuts = df4$maxx[nrow(df4)])$ivtable)
}
df <- sas7bdat::read.sas7bdat("Downloads/accepts.sas7bdat")
isoreg_bin(df, bad, bureau_score)

view raw
isoreg_bin.r
hosted with ❤ by GitHub

As shown in the output below, the number of generated bins and the information value happened to be between the result in (https://statcompute.wordpress.com/2017/06/15/finer-monotonic-binning-based-on-isotonic-regression) and the result in (https://statcompute.wordpress.com/2017/01/22/monotonic-binning-with-smbinning-package). More importantly, given a larger sample size for each bin, the binning algorithm is more robust and generalizable.

   Cutpoint CntRec CntGood CntBad CntCumRec CntCumGood CntCumBad PctRec GoodRate BadRate    Odds  LnOdds     WoE     IV
1    <= 559     79      34     45        79         34        45 0.0135   0.4304  0.5696  0.7556 -0.2803 -1.6362 0.0496
2    <= 602    189     102     87       268        136       132 0.0324   0.5397  0.4603  1.1724  0.1591 -1.1969 0.0608
3    <= 605     56      31     25       324        167       157 0.0096   0.5536  0.4464  1.2400  0.2151 -1.1408 0.0162
4    <= 632    468     279    189       792        446       346 0.0802   0.5962  0.4038  1.4762  0.3895 -0.9665 0.0946
5    <= 639    150      95     55       942        541       401 0.0257   0.6333  0.3667  1.7273  0.5465 -0.8094 0.0207
6    <= 653    451     300    151      1393        841       552 0.0773   0.6652  0.3348  1.9868  0.6865 -0.6694 0.0412
7    <= 662    295     213     82      1688       1054       634 0.0505   0.7220  0.2780  2.5976  0.9546 -0.4014 0.0091
8    <= 665    100      77     23      1788       1131       657 0.0171   0.7700  0.2300  3.3478  1.2083 -0.1476 0.0004
9    <= 667     57      44     13      1845       1175       670 0.0098   0.7719  0.2281  3.3846  1.2192 -0.1367 0.0002
10   <= 677    381     300     81      2226       1475       751 0.0653   0.7874  0.2126  3.7037  1.3093 -0.0466 0.0001
11   <= 679     66      53     13      2292       1528       764 0.0113   0.8030  0.1970  4.0769  1.4053  0.0494 0.0000
12   <= 683    160     129     31      2452       1657       795 0.0274   0.8062  0.1938  4.1613  1.4258  0.0699 0.0001
13   <= 689    203     164     39      2655       1821       834 0.0348   0.8079  0.1921  4.2051  1.4363  0.0804 0.0002
14   <= 699    304     249     55      2959       2070       889 0.0521   0.8191  0.1809  4.5273  1.5101  0.1542 0.0012
15   <= 707    312     268     44      3271       2338       933 0.0535   0.8590  0.1410  6.0909  1.8068  0.4509 0.0094
16   <= 717    368     318     50      3639       2656       983 0.0630   0.8641  0.1359  6.3600  1.8500  0.4941 0.0132
17   <= 721    134     119     15      3773       2775       998 0.0230   0.8881  0.1119  7.9333  2.0711  0.7151 0.0094
18   <= 739    474     438     36      4247       3213      1034 0.0812   0.9241  0.0759 12.1667  2.4987  1.1428 0.0735
19   <= 746    166     154     12      4413       3367      1046 0.0284   0.9277  0.0723 12.8333  2.5520  1.1961 0.0277
20      746   1109    1064     45      5522       4431      1091 0.1900   0.9594  0.0406 23.6444  3.1631  1.8072 0.3463
21  Missing    315     210    105      5837       4641      1196 0.0540   0.6667  0.3333  2.0000  0.6931 -0.6628 0.0282
22    Total   5837    4641   1196        NA         NA        NA 1.0000   0.7951  0.2049  3.8804  1.3559  0.0000 0.8021

Creating List with Iterator

In the post (https://statcompute.wordpress.com/2018/11/17/growing-list-vs-growing-queue), it is shown how to grow a list or a list-like queue based upon a dataframe. In the example, the code snippet was heavily relied on the FOR loop to do the assignment item by item, which I can’t help thinking of potential alternatives afterwards. For instance, is there an implementation that would enable us to traverse a dataframe without knowing its dimension in advance or even without using the loop?

In the previous example, if we’d want to fetch rows from a dataframe, we need to know the number of rows in advance by using the nrow() function. As shown below, we need to generate a sequence of row index and then to fetch rows by indexing,


lapply(seq(nrow(iris)), function(idx) as.list(iris[idx, ]))

If we don’t like to fetch rows from a dataframe by indexing, a workaround would be the split() function by splitting the dataframe into rows. The additional unname() function is doing nothing but removing redundant list names. However, we still need to know the number of rows in this solution.


unname(lapply(split(iris, seq(nrow(iris))), function(row) as.list(row)))

With the iterators package, the coding logic can be slightly cleaner and more generic by wrapping the dataframe into a row-wise iterator object, as demonstrated below.


lapply(iterators::iter(iris, by = 'row'), function(row) as.list(row))

In addition, the iterator object is customizable. For instance, we can easily apply a filter function to the iterator.


lapply(iterators::iter(iris, by = 'row', checkFunc = function(x) x$Species == "setosa" & x$Petal.Width > 0.4), function(x) as.list(x))

If the use case is not creating a list, as discussed above, but growing an empty list by inserting, then a simple iterator might not be sufficient. In such case, we might need to tweak it a little by enumerating the iterator with the ienum() function in the itertools2 package. Alternatively, we can also use itertools2::izip() function to construct the enumeration manually. It is noted that, because we need to assign values with a function call within the lapply() to a list in the parent environment, the scoping assignment should be used.


with(l1 <- list(), 
     invisible(lapply(itertools2::ienum(iterators::iter(iris, by = 'row')), function(x) l1[[x$index]] <<- as.list(x$value))))

### CHECK THE EQUALITY ###
identical(l1, lapply(iterators::iter(iris, by = 'row'), function(row) as.list(row)))
# TRUE

with(l2 <- list(), 
     invisible(lapply(itertools2::izip(i = itertools2::icount(start = 1), v = iterators::iter(iris, by = 'row')), function(x) l2[[x$i]] <<- as.list(x$v))))

### CHECK THE EQUALITY ###
identical(l2, lapply(iterators::iter(iris, by = 'row'), function(row) as.list(row)))
# TRUE

XFrames: Another Convenient Python Interface to Spark

Currently, pyspark might be the most popular python interface to Apache Spark. However, the xframes package (https://github.com/cchayden/xframes) definitely is an alternative worth trying.

As shown in the code snippet below, the XFrame, which is the dataframe object in the xframes package, interacts well with other python data structures and numpy functions. To me, the XFrame is easier to work with than the pyspark.dataframe and has more “authentic” python flavor.

from xframes import XFrame, aggregate

df = XFrame.read_csv("Downloads/nycflights.csv", header = True, nrows = 11)

### SUBSETTING
sel_cols = ["origin", "dest", "distance", "dep_delay", "carrier"]

df2 = df[sel_cols]
# OR:
# df.sql("select " + ", ".join(sel_cols) + " from df")

### FILTERING ###
print df2[(df2["origin"] == 'EWR') & (df2["carrier"] == "UA")]
# OR:
# print df2.filterby("EWR", "origin").filterby("UA", "carrier")

### AGGREGATING ###
from numpy import median

grp1 = df2.groupby("origin", {"dist": aggregate.CONCAT("distance")})

agg1 = XFrame({"origin": grp1["origin"], "med_dist": map(median, grp1["dist"])})
# OR:
# grp1["med_dist"] = grp1.apply(lambda row: median(row["dist"]))
# agg1 = grp1[["origin", "med_dist"]]
# USING SQL:
# df2.sql("select origin, percentile_approx(distance, 0.5) as med_dist from df2 group by origin")

for row in agg1:
  print row
# {'origin': u'LGA', 'med_dist': 747.5}
# {'origin': u'JFK', 'med_dist': 1089.0}
# {'origin': u'EWR', 'med_dist': 1065.0}

agg2 = df2.groupby("origin", {"avg_delay": aggregate.MEAN("dep_delay")})
# USING SQL:
# df2.sql("select origin, mean(dep_delay) as avg_delay from df2 group by origin")

for row in agg2:
  print row
# {'origin': u'LGA', 'avg_delay': -1.75}
# {'origin': u'JFK', 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'avg_delay': -2.3333333333333335}

### JOINING ###
for row in  agg1.join(agg2, on = {"origin": "origin"}, how = "inner"):
    print row
# {'origin': u'LGA', 'med_dist': 747.5, 'avg_delay': -1.75}
# {'origin': u'JFK', 'med_dist': 1089.0, 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'med_dist': 1065.0, 'avg_delay': -2.3333333333333335}

Growing List vs Growing Queue

### GROWING LIST ###
base_lst1 <- function(df) {
  l <- list()
  for (i in seq(nrow(df))) l[[i]] <- as.list(df[i, ])
  return(l)
}

### PRE-ALLOCATING LIST ###
base_lst2 <- function(df) {
  l <- vector(mode = "list", length = nrow(df))
  for (i in seq(nrow(df))) l[[i]] <- as.list(df[i, ])
  return(l)
}

### DEQUER PACKAGE ###
dequer_queue <- function(df) {
  q <- dequer::queue()
  for (i in seq(nrow(df))) dequer::pushback(q, as.list(df[i, ]))
  return(as.list(q))
}

### LIQUEUER PACKAGE ###
liqueuer_queue <- function(df) {
  q <- liqueueR::Queue$new()
  for (i in seq(nrow(df))) q$push(as.list(df[i, ]))
  return(q$data)
}

### COLLECTIONS PACKAGE ###
collections_queue <- function(df) {
  q <- collections::Queue$new()
  for (i in seq(nrow(df))) q$push(as.list(df[i, ]))
  return(q$as_list())
}

### RSTACKDEQUE PACKAGE ###
rstackdeque_queue <- function(df) {
  q <- rstackdeque::rpqueue()
  for (i in seq(nrow(df))) q <- rstackdeque::insert_back(q, as.list(df[i, ]))
  return(as.list(q))
}

nyc <- read.csv("Downloads/nycflights.csv")

compare <- function(ds) {
  tests <- c("dequer_queue(ds)",
             "base_lst2(ds)",
             "liqueuer_queue(ds)",
             "collections_queue(ds)",
             "rstackdeque_queue(ds)")
  for (t in tests) print(identical(base_lst1(ds), eval(parse(text = t))))
}

compare(nyc[1:10, ])
#[1] TRUE
#[1] TRUE
#[1] TRUE
#[1] TRUE
#[1] TRUE

### BENCHMARKS ###
bm <- function(ds) {
  rbenchmark::benchmark(replications = 5, order = "elapsed", relative = "elapsed",
                        columns = c("test", "replications", "elapsed", "relative"),
  "GROWING LIST"         = base_lst1(ds),
  "PRE-ALLOCATING LIST"  = base_lst2(ds),
  "DEQUER::QUEUE"        = dequer_queue(ds),
  "LIQUEUER::QUEUE"      = liqueuer_queue(ds),
  "COLLECTIONS::QUEUE"   = collections_queue(ds),
  "RSTACKDEQUE::RPQUEUE" = rstackdeque_queue(ds)
  )
}

bm(nyc[1:1000, ])
                  test replications elapsed relative
#1         GROWING LIST            5   0.808    1.000
#2  PRE-ALLOCATING LIST            5   0.839    1.038
#5   COLLECTIONS::QUEUE            5   0.842    1.042
#4      LIQUEUER::QUEUE            5   1.091    1.350
#3        DEQUER::QUEUE            5   1.375    1.702
#6 RSTACKDEQUE::RPQUEUE            5   1.901    2.353

bm(nyc[1:10000, ])
#                  test replications elapsed relative
#5   COLLECTIONS::QUEUE            5   8.175    1.000
#1         GROWING LIST            5   8.505    1.040
#2  PRE-ALLOCATING LIST            5  12.554    1.536
#4      LIQUEUER::QUEUE            5  17.325    2.119
#6 RSTACKDEQUE::RPQUEUE            5  21.785    2.665
#3        DEQUER::QUEUE            5  22.030    2.695

bm(nyc[1:20000, ])
#                  test replications elapsed relative
#5   COLLECTIONS::QUEUE            5  16.730    1.000
#2  PRE-ALLOCATING LIST            5  17.134    1.024
#1         GROWING LIST            5  17.342    1.037
#4      LIQUEUER::QUEUE            5  48.359    2.891
#6 RSTACKDEQUE::RPQUEUE            5  52.420    3.133
#3        DEQUER::QUEUE            5  79.938    4.778

bm(nyc[1:30000, ])
#                  test replications elapsed relative
#2  PRE-ALLOCATING LIST            5  24.600    1.000
#5   COLLECTIONS::QUEUE            5  24.797    1.008
#1         GROWING LIST            5  25.600    1.041
#6 RSTACKDEQUE::RPQUEUE            5  60.908    2.476
#4      LIQUEUER::QUEUE            5 102.482    4.166
#3        DEQUER::QUEUE            5 182.039    7.400

Convert Data Frame to Dictionary List in R

In R, there are a couple ways to convert the column-oriented data frame to a row-oriented dictionary list or alike, e.g. a list of lists.

In the code snippet below, I would show each approach and how to extract keys and values from the dictionary. As shown in the benchmark, it appears that the generic R data structure is still the most efficient.

### LIST() FUNCTION IN BASE PACKAGE ###
x1 <- as.list(iris[1, ])
names(x1)
# [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width"  "Species"
x1[["Sepal.Length"]]
# [1] 5.1

### ENVIRONMENT-BASED SOLUTION ###
envn_dict <- function(x) {
  e <- new.env(hash = TRUE)
  for (name in names(x)) assign(name, x[, name], e)
  return(e)
}

x2 <- envn_dict(iris[1, ])
ls(x2)
# [1] "Petal.Length" "Petal.Width"  "Sepal.Length" "Sepal.Width"  "Species"
x2[["Sepal.Length"]]
# [1] 5.1

### COLLECTIONS PACKAGE ###
coll_dict <-  function(x) {
  d <- collections::Dict$new()
  for (name in names(x)) d$set(name, x[, name])
  return(d)
}

x3 <- coll_dict(iris[1, ])
x3$keys()
# [1] "Petal.Length" "Petal.Width"  "Sepal.Length" "Sepal.Width"  "Species"
x3$get("Sepal.Length")
# [1] 5.1

### HASH PACKAGE ###
hash_dict <- function(x) {
  d <- hash::hash()
  for (name in names(x)) d[[name]] <- x[, name]
  return(d)
}

x4 <- hash_dict(iris[1, ])
hash::keys(x4)
# [1] "Petal.Length" "Petal.Width"  "Sepal.Length" "Sepal.Width"  "Species"
hash::values(x4, "Sepal.Length")
# Sepal.Length
#          5.1

### DATASTRUCTURES PACKAGE ###
data_dict <- function(x) {
  d <- datastructures::hashmap()
  for (name in names(x)) d[name] <- x[, name]
  return(d)
}

x5 <- data_dict(iris[1, ])
datastructures::keys(x5)
# [1] "Species"      "Sepal.Width"  "Petal.Length" "Sepal.Length" "Petal.Width"
datastructures::get(x5, "Sepal.Length")
# [1] 5.1

### FROM PYTHON ###
py2r_dict <- function(x) {
  return(reticulate::py_dict(names(x), x, TRUE))
}

x6 <- py2r_dict(iris[1, ])
x6$keys()
# [1] "Petal.Length" "Sepal.Length" "Petal.Width"  "Sepal.Width"  "Species"
x6["Sepal.Length"]
# [1] 5.1

### CONVERT DATAFRAME TO DICTIONARY LIST ###
to_list <- function(df, fn) {
  l <- list()
  for (i in seq(nrow(df))) l[[i]] <- fn(df[i, ])
  return(l)
}

rbenchmark::benchmark(replications = 100, order = "elapsed", relative = "elapsed",
                      columns = c("test", "replications", "elapsed", "relative", "user.self", "sys.self"),
  "BASE::LIST"              = to_list(iris, as.list),
  "BASE::ENVIRONMENT"       = to_list(iris, envn_dict),
  "COLLECTIONS::DICT"       = to_list(iris, coll_dict),
  "HASH::HASH"              = to_list(iris, hash_dict),
  "DATASTRUCTURES::HASHMAP" = to_list(iris, data_dict),
  "RETICULATE::PY_DICT"     = to_list(iris, py2r_dict)
)
#                     test replications elapsed relative user.self sys.self
#1              BASE::LIST          100   0.857    1.000     0.857    0.000
#2       BASE::ENVIRONMENT          100   1.607    1.875     1.607    0.000
#4              HASH::HASH          100   2.600    3.034     2.600    0.000
#3       COLLECTIONS::DICT          100   2.956    3.449     2.956    0.000
#5 DATASTRUCTURES::HASHMAP          100  16.070   18.751    16.071    0.000
#6     RETICULATE::PY_DICT          100  18.030   21.039    18.023    0.008

Data Wrangling with Astropy Package

from astropy.io import ascii

from astropy.table import Table, join

from numpy import nanmean, nanmedian, array, sort

tbl1 = ascii.read("Downloads/nycflights.csv", format = "csv")

### SUBSETTING
sel_cols = ["origin", "dest", "distance", "dep_delay", "carrier"]

tbl2 = tbl1[sel_cols][range(10)]

tbl2.info
#   name   dtype
#--------- -----
#   origin  str3
#     dest  str3
# distance int64
#dep_delay int64
#  carrier  str2

### FILTERING ###
tbl2[list(i["carrier"] == "UA" and i["origin"] == 'EWR' for i in tbl2)]
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

tbl2[(tbl2["carrier"] == "UA") & (tbl2["origin"] == "EWR")]
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

### FILTER BY GROUPS ###
vstack(map(lambda x: x[(x["carrier"] == "UA") & (x["origin"] == "EWR")],
           tbl2.group_by(sort(array(range(len(tbl2))) % 2)).groups))
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

### GROUPING ###
grp = tbl2.group_by("origin")

### AGGREGATING ###
agg1 = Table(grp['origin', 'distance'].groups.aggregate(nanmedian), names = ["origin", "med_dist"])
#origin med_dist
#------ --------
#   EWR   1065.0
#   JFK   1089.0
#   LGA    747.5

agg2 = Table(grp['origin', 'dep_delay'].groups.aggregate(nanmean), names = ["origin", "avg_delay"])
#origin      avg_delay
#------ -------------------
#   EWR -2.3333333333333335
#   JFK -0.6666666666666666
#   LGA               -1.75

### JOINING ###
join(agg1, agg2, join_type = "inner", keys = "origin")
#origin med_dist      avg_delay
#------ -------- -------------------
#   EWR   1065.0 -2.3333333333333335
#   JFK   1089.0 -0.6666666666666666
#   LGA    747.5               -1.75

Manipulating Dictionary List with SQLite Back-End


from astropy.io.ascii import read

selected = ["origin", "dep_delay", "distance"]

csv = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), csv)

from dataset import connect

### CREATE IN-MEMORY SQLITE DB ###
db = connect('sqlite:///:memory:', row_type = dict)

tbl = db.create_table("tbl", primary_id = False)

tbl.insert_many(lst)

list(db.query("select * from tbl limit 3"))
# [{u'dep_delay': 2, u'distance': 1400, u'origin': u'EWR'},
#  {u'dep_delay': 4, u'distance': 1416, u'origin': u'LGA'},
#  {u'dep_delay': 2, u'distance': 1089, u'origin': u'JFK'}]

sum1 = db.create_table("sum1", primary_id = False)

from numpy import nanmedian

sum1.insert_many(
  map(lambda x: dict(origin = x,
                     med_dist = nanmedian([i["distance"] for i in
                       db.query("select distance from tbl where origin = :origin", {"origin": x})])),
      [i["origin"] for i in db.query("select distinct origin from tbl")]))

sum2 = db.create_table("sum2", primary_id = False)

sum2.insert_many(list(db.query("select origin, ROUND(AVG(dep_delay), 2) as avg_delay from tbl group by origin")))

list(db.query("select a.*, b.avg_delay from sum1 as a, sum2 as b where a.origin = b.origin"))
#[{u'avg_delay': -2.33, u'med_dist': 1065.0, u'origin': u'EWR'},
# {u'avg_delay': -1.75, u'med_dist': 747.5, u'origin': u'LGA'},
# {u'avg_delay': -0.67, u'med_dist': 1089.0, u'origin': u'JFK'}]

Joining Dictionary Lists by Key

### IMPORT AND PREPARE THE DATA ###
from astropy.io.ascii import read

selected = ["origin", "dep_delay", "distance"]

tbl = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), tbl)

key = set(map(lambda x: x["origin"], lst))

grp = dict(map(lambda x: (x, [i for i in lst if i["origin"] == x]), key))

#   ALTERNATIVELY, USE CLJ.SEQS.GROUP_BY()
# from clj.seqs import group_by
# group_by(lambda x: x["origin"], lst)
#   OR TOOLZ.ITERTOOLZ.GROUPBY()
# from toolz import itertoolz
# itertoolz.groupby("origin", lst)
#   OR ITERTOOLS.GROUPBY()
# key_fn = lambda x: x["origin"]
# dict((k, list(g)) for k, g in groupby(sorted(lst, key = key_fn), key_fn))

### CREATE 2 DICTIONARY LISTS TO BE MERGED ###
from numpy import nanmean, nanmedian

l1 = list({"origin"   : k,
           "med_dist" : nanmedian([v["distance"] for v in grp[k]])
          } for k in grp.keys())
#[{'med_dist': 1089.0, 'origin': 'JFK'},
# {'med_dist': 1065.0, 'origin': 'EWR'},
# {'med_dist': 747.5, 'origin': 'LGA'}]

l2 = list({"origin"   : k,
           "avg_delay": round(nanmean(map(lambda v: v["dep_delay"], grp[k])), 2)
          } for k in grp.keys())
#[{'avg_delay': -0.67, 'origin': 'JFK'},
# {'avg_delay': -2.33, 'origin': 'EWR'},
# {'avg_delay': -1.75, 'origin': 'LGA'}]

### METHOD 1: LIST COMPREHENSION ###
join1 = list(dict(v1, **v2) for v2 in l2 for v1 in l1 if v1["origin"] == v2["origin"])

### METHOD 2: CARTESIAN PRODUCT WITH ITERTOOLS.PRODUCT() ###
from itertools import product

join2 = list(dict(d1, **d2) for d1, d2 in product(l1, l2) if d1["origin"] == d2["origin"])

### METHOD 3: AD-HOC FUNCTION WITH DICT COPY() AND UPDATE() METHODS ###
def join_fn(x, y, key):
  result = list()
  for i, j in group_by(lambda x: x[key], x + y).values():
    xy = i.copy()
    xy.update(j)
    result.append(xy)
  return result

join3 = join_fn(l1, l2, "origin")

### METHOD 4: DEFAULTDICT() ###
from collections import defaultdict

dd = defaultdict(dict)

for x in l1 + l2:
  dd[x["origin"]].update(x)

join4 = dd.values()

### METHOD 5: ORDEREDDICT() ###
from collections import OrderedDict

od = OrderedDict()

for x in l1 + l2:
  if x["origin"] in od:
    od[x["origin"]].update(x)
  else:
    od[x["origin"]] = x

join5 = od.values()

### METHOD 6:  ###
from toolz import itertoolz, dicttoolz

join6 = list(dicttoolz.merge(i, j) for i, j in  itertoolz.join("origin", l1, "origin", l2))

# EXPECTED OUTPUT:
# [{'avg_delay': -0.67, 'med_dist': 1089.0, 'origin': 'JFK'},
#  {'avg_delay': -2.33, 'med_dist': 1065.0, 'origin': 'EWR'},
#  {'avg_delay': -1.75, 'med_dist': 747.5, 'origin': 'LGA'}]

Aggregation of Dictionary List by Key

from astropy.io.ascii import read

from numpy import nanmean, nanmedian, vectorize

selected = ["origin", "dep_delay", "distance"]

tbl = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), tbl)

key = set([x["origin"] for x in lst])

grp = map(lambda x: {x: list(i for i in lst if i["origin"] == x)}, key)

### USING LIST COMPREHENSION ###
list({"origin"    : x.keys()[0],
      "flight_nbr": len(x.values()[0]),
      "med_dist"  : nanmedian([i["distance"] for i in x.values()[0]]),
      "avg_delay" : round(nanmean([i["dep_delay"] for i in x.values()[0]]), 2)
      } for x in grp)

### USING MAP() ALTERNATIVELY ###
map(lambda x: {"origin"    : x.keys()[0],
               "flight_nbr": len(x.values()[0]),
               "med_dist"  : nanmedian([i["distance"] for i in x.values()[0]]),
               "avg_delay" : round(nanmean([i["dep_delay"] for i in x.values()[0]]), 2)}, grp)

### USING VECTORIZE() ###
def agg(x):
  return({"origin"    : x.keys()[0],
          "flight_nbr": len(x.values()[0]),
          "med_dist"  : nanmedian([i["distance"] for i in x.values()[0]]),
          "avg_delay" : round(nanmean([i["dep_delay"] for i in x.values()[0]]), 2)})

list(vectorize(agg)(grp))

# RESULT:
# [{'avg_delay': -0.67, 'flight_nbr': 3, 'med_dist': 1089.0, 'origin': 'JFK'},
#  {'avg_delay': -2.33, 'flight_nbr': 3, 'med_dist': 1065.0, 'origin': 'EWR'},
#  {'avg_delay': -1.75, 'flight_nbr': 4, 'med_dist': 747.5, 'origin': 'LGA'}]

Grouping Dictionary List by Key

It is shown in code snippets below how to group a dictionary list based on a specific key.

First of all, let’s import the data from a csv file.

from astropy.io.ascii import read

selected = ["origin", "dest", "distance", "carrier"]

### IMPORT CSV FILE INTO ASTROPY TABLE ###
tbl = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

### CONVERT ASTROPY TABLE TO DICTIONARY LIST ###
lst = map(lambda x: dict(zip(x.colnames, x)), tbl)

### DISPLAY DATA CONTENTS ###
from tabulate import tabulate

print tabulate([lst[i] for i in range(3)], headers = "keys", tablefmt = "fancy_grid")

╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ EWR      │ IAH    │ UA        │       1400 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ ORD    │ UA        │        719 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ FLL    │ B6        │       1065 │
╘══════════╧════════╧═══════════╧════════════╛

In the first approach, only standard Python modules and data structures are used.

### APPROACH 1: HOMEBREW GROUPING ###

from operator import itemgetter

### GET UNIQUE VALUES OF GROUP KEY ###
g_key = set([x["origin"] for x in lst])

### GROUPING LIST BY GROUP KEY ###
g_lst1 = sorted(map(lambda x: (x, [i for i in lst if i["origin"] == x]), g_key), key = itemgetter(0))

for i in g_lst1:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the second approach, we first sort the list by the key and then group the list with the itertools.groupby() function.

### APPROACH 2: ITERTOOLS.GROUPBY  ###

### SORTING DICTIONARY BEFORE GROUPING ###
s_lst = sorted(lst, key = itemgetter('origin'))

### GROUPING DICTIONARY BY "ORIGIN" ###
from itertools import groupby

g_lst2 = [(k, list(g)) for k, g in groupby(s_lst, itemgetter("origin"))]

for i in g_lst2:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the third approach, we use the defaultdict class in the collections module.

### APPROACH 3: DEFAULTDICT ###

from collections import defaultdict

### CREATE KEY-VALUE PAIRS FROM LIST ###
ddata = [(x["origin"], x) for x in lst]

### CREATE DEFAULTDICT ###
ddict = defaultdict(list)

for key, value in ddata:
  ddict[key].append(value)

g_lst3 = sorted(ddict.items(), key = itemgetter(0))

for i in g_lst3:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the fourth approach, we use the ordereddict class also in the collections module.

### APPROACH 4: ORDEREDDICT ###
from collections import OrderedDict

odict = OrderedDict()

for key, value in ddata:
  if key in odict: odict[key].append(value)
  else: odict[key] = [value]

g_lst4 = sorted(odict.items(), key = itemgetter(0))

for i in g_lst4:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the output below, it is shown that four grouped lists are identical.


g_lst1 == g_lst2 == g_lst3 == g_lst4
# True

╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ EWR      │ IAH    │ UA        │       1400 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ ORD    │ UA        │        719 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ FLL    │ B6        │       1065 │
╘══════════╧════════╧═══════════╧════════════╛
╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ JFK      │ MIA    │ AA        │       1089 │
├──────────┼────────┼───────────┼────────────┤
│ JFK      │ BQN    │ B6        │       1576 │
├──────────┼────────┼───────────┼────────────┤
│ JFK      │ MCO    │ B6        │        944 │
╘══════════╧════════╧═══════════╧════════════╛
╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ LGA      │ IAH    │ UA        │       1416 │
├──────────┼────────┼───────────┼────────────┤
│ LGA      │ ATL    │ DL        │        762 │
├──────────┼────────┼───────────┼────────────┤
│ LGA      │ IAD    │ EV        │        229 │
├──────────┼────────┼───────────┼────────────┤
│ LGA      │ ORD    │ AA        │        733 │
╘══════════╧════════╧═══════════╧════════════╛

Subset Dictionary by Values

from csv import DictReader

selected = ["origin", "dest", "distance", "carrier"]

with open("Downloads/nycflights.csv") as f:
  d = DictReader(f)
  l = map(lambda d: {k: d[k] for k in selected}, [next(d) for i in xrange(10)])

from pandas import DataFrame

from sys import getsizeof

### COMPARING OBJECT SIZES BETWEEN A DATAFRAME AND A LIST ###
float(getsizeof(DataFrame(l))) / getsizeof(l)
# 13.282894736842104

from tabulate import tabulate

print tabulate([l[i] for i in range(3)], headers = "keys")
# origin    dest    carrier      distance
# --------  ------  ---------  ----------
# EWR       IAH     UA               1400
# LGA       IAH     UA               1416
# JFK       MIA     AA               1089

### SOLUTION 1: LIST COMPREHENSION ###
list(x for x in l if x["origin"] == 'JFK' and x["carrier"] == 'B6')

### SOLUTION 2: FILTER() FUNCTION ###
filter(lambda x: x["origin"] == 'JFK' and x["carrier"] == 'B6', l)

### SOLUTION 3: MAP() AND LAMBDA() FUNCTIONS ###
list(v for v, i in map(lambda x: (x, x['origin'] == 'JFK' and x["carrier"] == 'B6'), l) if i)

### SOLUTION 4: CREATE A CLASS ###
class subset:
  def __init__(self, lst, expr):
    self.result = []
    for x in lst:
      if eval(expr):
        self.result.append(x)

subset(l, 'x["origin"] == "JFK" and x["carrier"] == "B6"').result

### SOLUTION 5: LIST_DICT_DB MODULE ###
from list_dict_DB import list_dict_DB as dict2db
db = dict2db(l)
db.query(origin = 'JFK', carrier = 'B6')

# EXPECTED OUTCOME:
# [{'carrier': 'B6', 'dest': 'BQN', 'distance': '1576', 'origin': 'JFK'},
#  {'carrier': 'B6', 'dest': 'MCO', 'distance': '944', 'origin': 'JFK'}]

Subset Dictionary by Keys

### READ A DATA SAMPLE WITH THREE RECORDS FROM THE CSV FILE
from csv import DictReader

with open("Downloads/nycflights.csv") as f:
  d = DictReader(f)
  l = [next(d) for i in xrange(3)]

### GET A DICT FROM THE LIST
d = l[0]

### SHOW KEYS OF THE DICT
print d.keys()
#['origin', 'dep_time', 'flight', 'hour', 'dep_delay', 'distance', 'dest', 'month', 'air_time', 'carrier', 'year', 'arr_delay', 'tailnum', 'arr_time', 'day', 'minute']

### SELECTED KEYS
selected = ["origin", "dest", "distance", "carrier"]

# EXPECTED OUTPUT 
# {'carrier': 'UA', 'dest': 'IAH', 'distance': '1400', 'origin': 'EWR'}

### METHOD 1: DEFINE BY THE DICT COMPREHENSION

{k: d[k] for k in selected}

{k: d.get(k) for k in selected}

{k: v for k, v in d.items() if k in selected}

### METHOD 2: DEFINE BY THE DICT CONSTRUCTOR

dict((k, d[k]) for k in selected)

dict(map(lambda k: (k, d[k]), selected))

dict(filter(lambda i: i[0] in selected, d.items()))

### METHOD 3: DEFINE WITH THE ZIP() FUNCTION

dict(zip(selected, [d[k] for k in selected]))

# ITEMGETTER() FUNCTION WITH THE UNPACK OPERATOR (*)
from operator import itemgetter
dict(zip(selected, itemgetter(*selected)(d)))

# AT() FUNCTION WITH THE UNPACK OPERATOR (*)
from pydash import at
dict(zip(selected, at(d, *selected)))

### APPLY ABOVE LOGIC TO THE WHOLE LIST OF DICTIONARIES
### WITH THE MAP FUNCTION
map(lambda d: {k: d[k] for k in selected}, l)

### ALTERNATIVELY, WITH THE LIST COMPREHENSION
[(lambda x: {k: x[k] for k in selected})(d) for d in l]

### OR THE PARALLEL POOL.MAP() FUNCTION
# ALWAYS DEFINE THE FUNCTION FIRST
def sel(d):
  return({k: d[k] for k in selected})

# THE MULTIPROCESSING MODULE NEXT
from multiprocessing import Pool, cpu_count
from contextlib import closing

with closing(Pool(processes = cpu_count())) as pool:
  pool.map(sel, l)
  pool.terminate()

# OUTPUT:
# [{'carrier': 'UA', 'dest': 'IAH', 'distance': '1400', 'origin': 'EWR'},
#  {'carrier': 'UA', 'dest': 'IAH', 'distance': '1416', 'origin': 'LGA'},
#  {'carrier': 'AA', 'dest': 'MIA', 'distance': '1089', 'origin': 'JFK'}]

Import CSV as Dictionary List

When the DictReader() function in the csv module is used to read the csv file as a list of dictionaries in python, numbers would be imported as strings, as shown below.


from csv import DictReader

from pprint import pprint

### EXAMINE 3 ROWS OF DATA
with open("Downloads/nycflights.csv") as f:
  d = DictReader(f)
  l = [next(d) for i in xrange(3)]

pprint(l[0])
#{'air_time': '227',
# 'arr_delay': '11',
# 'arr_time': '830',
# 'carrier': 'UA',
# 'day': '1',
# 'dep_delay': '2',
# 'dep_time': '517',
# 'dest': 'IAH',
# 'distance': '1400',
# 'flight': '1545',
# 'hour': '5',
# 'minute': '17',
# 'month': '1',
# 'origin': 'EWR',
# 'tailnum': 'N14228',
# 'year': '2013'}

A solution to address the aforementioned issue is first to import the csv file into a Pandas DataFrame and then to convert the DataFrame to the list of dictionaries, as shown in the code snippet below.


from pandas import read_csv

from numpy import array_split

from multiprocessing import Pool, cpu_count

n = 1000

d = read_csv("Downloads/nycflights.csv", nrows = n)

%time l1 = [dict(zip(d.iloc[i].index.values, d.iloc[i].values)) for i in range(len(d))]
#CPU times: user 396 ms, sys: 39.9 ms, total: 436 ms
#Wall time: 387 ms

pprint(l1[0])
#{'air_time': 227.0,
# 'arr_delay': 11.0,
# 'arr_time': 830.0,
# 'carrier': 'UA',
# 'day': 1,
# 'dep_delay': 2.0,
# 'dep_time': 517.0,
# 'dest': 'IAH',
# 'distance': 1400,
# 'flight': 1545,
# 'hour': 5.0,
# 'minute': 17.0,
# 'month': 1,
# 'origin': 'EWR',
# 'tailnum': 'N14228',
# 'year': 2013}

In addition to using the list comprehension as shown above, we can also split the DataFrame into pieces and then apply the MapReduce paradigm together with lambda functions.


d2 = array_split(d, 4, axis = 0)

%%time
l2 = reduce(lambda a, b: a + b,
       map(lambda df: [dict(zip(df.iloc[i].index.values, df.iloc[i].values)) for i in range(len(df))], d2))
#CPU times: user 513 ms, sys: 83.3 ms, total: 596 ms
#Wall time: 487 ms

Alternatively, we could also apply the parallelism with the multiprocessing module. As shown in the benchmark, the parallel version is much more efficient than the sequential version.


def p2dict(df):
  return([dict(zip(df.iloc[i].index.values, df.iloc[i].values)) for i in range(len(df))])

pool = Pool(processes = cpu_count())

%time l3 = reduce(lambda a, b: a + b, pool.map(p2dict, d2))
#CPU times: user 12.5 ms, sys: 23 µs, total: 12.5 ms
#Wall time: 204 ms

pool.close()

In addition to using Pandas, we can also consider the astropy module. As shown in the code snippet, albeit slightly slower than Pandas, astropy is much cleaner and more intuitive.


from astropy.io.ascii import read

a = read("Downloads/nycflights.csv", format = 'csv', data_end = n + 1)

def a2dict(row):
  return(dict(zip(row.colnames, row)))

pool = Pool(processes = cpu_count())

%time l4 = pool.map(a2dict, a)
#CPU times: user 90.6 ms, sys: 4.47 ms, total: 95.1 ms
#Wall time: 590 ms

pool.close()

By-Group Summary with SparkR – Follow-up for A Reader Comment

A reader, e.g. Mr. Wayne Zhang, of my previous post (https://statcompute.wordpress.com/2018/09/03/playing-map-and-reduce-in-r-by-group-calculation) made a good comment that “Why not use directly either Spark or H2O to derive such computations without involving detailed map/reduce”.

Although Spark is not as flexible as R in the statistical computation (in my opinion), it does have advantages for munging large-size data sets, such as aggregating, selecting, filtering, and so on. In the demonstration below, it is shown how to do the same by-group calculation by using SparkR.

In SparkR, the most convenient way to do the by-group calculation is to use the agg() function after grouping the Spark DataFrame based on the specific column (or columns) with the groupBy() function.

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local", sparkConfig = list(spark.driver.memory = "10g", spark.driver.cores = "4"))
df <- as.DataFrame(iris)
summ1 <- agg(
  groupBy(df, alias(df$Species, "species")), 
  sl_avg = avg(df$Sepal_Length), 
  sw_avg = avg(df$Sepal_Width)
)
showDF(summ1)
+----------+-----------------+------------------+
|   species|           sl_avg|            sw_avg|
+----------+-----------------+------------------+
| virginica|6.587999999999998|2.9739999999999998|
|versicolor|            5.936|2.7700000000000005|
|    setosa|5.005999999999999| 3.428000000000001|
+----------+-----------------+------------------+

Alternatively, we can also use the gapply() function to apply an anonymous function calculating statistics to each chunk of the grouped Spark DataFrame. What’s more flexible in this approach is that we can define the schema of the output data, such as names and formats.

summ2 <- gapply(
  df, 
  df$"Species", 
  function(key, x) {
    data.frame(key, mean(x$Sepal_Length), mean(x$Sepal_Width), stringsAsFactors = F)
  }, 
  "species STRING, sl_avg DOUBLE, sw_avg DOUBLE"
)
showDF(summ2)
+----------+------+------+
|   species|sl_avg|sw_avg|
+----------+------+------+
| virginica| 6.588| 2.974|
|versicolor| 5.936|  2.77|
|    setosa| 5.006| 3.428|
+----------+------+------+

At last, we can take advantage of the Spark SQL engine after saving the DataFrame as a table.

createOrReplaceTempView(df, "tbl")
summ3 <- sql("select Species as species, avg(Sepal_Length) as sl_avg, avg(Sepal_Width) as sw_avg from tbl group by Species")
showDF(summ3)
+----------+-----------------+------------------+
|   species|           sl_avg|            sw_avg|
+----------+-----------------+------------------+
| virginica|6.587999999999998|2.9739999999999998|
|versicolor|            5.936|2.7700000000000005|
|    setosa|5.005999999999999| 3.428000000000001|
+----------+-----------------+------------------+

Union Multiple Data.Frames with Different Column Names

On Friday, while working on a project that I needed to union multiple data.frames with different column names, I realized that the base::rbind() function doesn’t take data.frames with different columns names and therefore just quickly drafted a rbind2() function on the fly to get the job done based on the idea of MapReduce that I discussed before (https://statcompute.wordpress.com/2018/09/08/playing-map-and-reduce-in-r-subsetting).

rbind2 <- function(lst) {
  h <- unique(unlist(lapply(lst, names)))
  Reduce(rbind, parallel::mcMap(function(x) {x[, setdiff(h, names(x))] <- NA; return(x)}, lst, mc.cores = length(lst)))
}

On Saturday, when I revisited the problem, I found a very good thread on the stackoverflow (https://stackoverflow.com/questions/3402371/combine-two-data-frames-by-rows-rbind-when-they-have-different-sets-of-columns) discussing various approaches addressing my problem yesterday. Out of curiosity, I did a comparison between the rbind2() and discussed approaches by combining 8 data.frames each with a million records. As shown in the plot, my homebrew rbind2() function is only marginally faster than the gtools::smartbind() function and the dplyr::bind_rows function is the most efficient.

n <- 1000000
d1 <- data.frame(id = 1:n, x1 = 1)
d2 <- data.frame(id = 1:n, x2 = 2)
d3 <- data.frame(id = 1:n, x3 = 3)
d4 <- data.frame(id = 1:n, x4 = 4)
d5 <- data.frame(id = 1:n, x5 = 5)
d6 <- data.frame(id = 1:n, x6 = 6)
d7 <- data.frame(id = 1:n, x7 = 7)
d8 <- data.frame(id = 1:n, x8 = 8)
microbenchmark::microbenchmark(times = 10, 
  "homebrew::rbind2"      = {rbind2(list(d1, d2, d3, d4, d5, d6, d7, d8))},
  "gtools::smartbind"     = {gtools::smartbind(list = list(d1, d2, d3, d4, d5, d6, d7, d8))},
  "dplyr::bind_rows"      = {dplyr::bind_rows(d1, d2, d3, d4, d5, d6, d7, d8)},
  "plyr::rbind.fill"      = {plyr::rbind.fill(d1, d2, d3, d4, d5, d6, d7, d8)},
  "data.table::rbindlist" = {data.table::rbindlist(list(d1, d2, d3, d4, d5, d6, d7, d8), fill = T)}
)

Rplot

Why Vectorize?

In the post (https://statcompute.wordpress.com/2018/09/15/how-to-avoid-for-loop-in-r), I briefly introduced the idea of vectorization and potential use cases. One might be wondering why we even need the Vectorize() function given the fact that it is just a wrapper and whether there is any material efficiency gain by vectorizing a function. It is true that the Vectorize() function is not able to improve the efficiency of any function itself that is wrapped around, e.g. vectorized. However, the vectorization can change the input format of a function that normally consumes scalar inputs before being vectorized and therefore would improve the processing efficiency. An example is given below to demonstrate the value of vectorization.

When we want to locate the index of a value within the long vector with millions of rows, the which() function should be the fastest, e.g. "which((0:100) == 10)". When we want to locate indices of several values within the vector, the match() function might be the most intuitive, e.g. "match(c(10, 12), 0:100)". If we would like to take advantage of the speed offered by the which() function, then we might consider one of the following:
A. Using the “%in%” operator within the which() function such as "which(0:100 %in% c(10, 12))", where “%in%” is the shorthand of the match() function.
B. Parsing out each lookup value and then connecting them by “|” operators such as "which(eval(parse(text = paste('0:100 == ', c(10, 12), collapse= '|'))))".

Besides the two above, we can also leverage the idea of MapReduce discussed in https://statcompute.wordpress.com/2018/09/08/playing-map-and-reduce-in-r-subsetting such as "Reduce(c, Map(function(x) which((0:100) == x), c(10, 12)))".

However, since the Vectorize() function is able to change the input format from a scalar to a vector, we can now consider vectorizing the which() function, which would consume the vector directly such as "(Vectorize(function(s, l) which(l == s), 's')) (c(10, 12), 0:100)". In this newly defined function, there are two parameters, e.g. “s” and “l”, of which “s” is the input changing from a scalar to a vector after the vectorization.

With all ideas on the table, a benchmark comparison is presented below to show how fast to look up 5 values from a vector with a million rows by using each above-mentioned approach. Additionally, since it is straightforward to extend the idea of Parallelism to MapReduce and vectorization, we will add two parallel solutions in the benchmark, including the parallel::pvec() function that executes the vectorization in parallel and the parallel::mcMap() function that is considered the parallelized Map() function.

tbl <- 0:1000000
lst <- 10 ** (1:5)

str_which <- function(s, l) which(eval(parse(text = paste('l == ', s, collapse=  '|'))))

map_which <- function(s, l) Reduce(c, Map(function(x) which(l == x), s))

vec_which <- Vectorize(function(s, l) which(l == s), 's')

mcmap_which <- function(s, l) Reduce(c, parallel::mcMap(function(x) which(l == x), s, mc.cores = length(s)))

mcvec_which <- function(s, l) parallel::pvec(s, mc.cores = length(s), function(x) which(l == x))

rbenchmark::benchmark(replications = 1000, order = "user.self", relative = "user.self",
  columns = c("test", "relative", "elapsed", "user.self", "sys.self", "user.child", "sys.child"),
  match = {match(lst, tbl)},
  which = {which(tbl %in% lst)},
  str_which = {str_which(lst, tbl)},
  vec_which = {vec_which(lst, tbl)},
  map_which = {map_which(lst, tbl)},
  mcvec_which = {mcvec_which(lst, tbl)},
  mcmap_which = {mcmap_which(lst, tbl)}
)
#        test relative elapsed user.self sys.self user.child sys.child
# mcvec_which    1.000  25.296     1.722   12.477     33.191    30.004
# mcmap_which    1.014  25.501     1.746   12.424     34.231    30.228
#   map_which    9.642  18.240    16.604    1.635      0.000     0.000
#   vec_which    9.777  18.413    16.836    1.576      0.000     0.000
#       which   12.130  22.060    20.888    1.171      0.000     0.000
#   str_which   13.467  25.355    23.191    2.164      0.000     0.000
#       match   36.659  64.114    63.126    0.986      0.000     0.000

With no surprise, both parallel solutions are at least 10 times faster than any single-core solution in terms of the user CPU time. It is also intriguing to see that the vectorization is as efficient as the MapReduce no matter with a single core or multiple cores and is significantly faster than first three approaches shown early and that the match() function, albeit simple, is the slowest, which in turn justifies efforts on vectorizing the which() function.

How to Avoid For Loop in R

A FOR loop is the most intuitive way to apply an operation to a series by looping through each item one by one, which makes perfect sense logically but should be avoided by useRs given the low efficiency. In R, there are two ways to implement the same functionality of a FOR loop. The first option is the lapply() or sapply() function that applies a function to each item in the list, which is very similar to the Map() function that I showed in https://statcompute.wordpress.com/2018/09/08/playing-map-and-reduce-in-r-subsetting and https://statcompute.wordpress.com/2018/09/03/playing-map-and-reduce-in-r-by-group-calculation. The second option is to “vectorize” a function by using the Vectorize() function such that the newly vectorized function can consume the list directly.

Below is a quick demonstration showing how to recode a FOR loop by using lapply() and Vectorize() functions. We first created a dummy loop that iterates 3 times and then prints out itself.

for (i in 1:3) {print(paste("iter", i))}
#[1] "iter 1"
#[1] "iter 2"
#[1] "iter 3"

To migrate the above FOR loop, we just need to wrap the operation “print(paste(“iter”, i))” into an anonymous function and then to apply this anonymous function to each element in the series by using the lapply() function. Please note that the invisible() function used below doesn’t do anything material but suppress printing out the object value.

invisible(lapply(1:3, function(i) print(paste("iter", i))))
#[1] "iter 1"
#[1] "iter 2"
#[1] "iter 3"

The vectorization is a little tricky. It is noted that the anonymous function created above can only be applied to each item in the series. In order to have the anonymous function consuming the whole series instead of the single item, we should create a so-called vectorized function by using the Vectorize() function and then apply this newly created function to the series directly, as shown below.

invisible(Vectorize(function(i) print(paste("iter", i)), SIMPLIFY = F) (1:3))
#[1] "iter 1"
#[1] "iter 2"
#[1] "iter 3"

From what has been shown so far, it appears that the solution with a FOR loop is most intuitive and easier to understand. One might wonder why we need to go through the hassle.

In the example below that is borrowed from https://statcompute.wordpress.com/2018/09/08/playing-map-and-reduce-in-r-subsetting, let’s see how to get the job done with the FOR loop. First of all, we need to get things ready by converting the data.frame into a list with 2 data.frames named “lst” and defining a subsetting function named “fn”, similar to what we did before.

data(iris)
expr = expression(Sepal.Length > 7 & Sepal.Width > 3)
lst <- split(iris, sort((1:nrow(iris)) %% 2))
fn <- function(x) x[with(x, which(eval(expr))), ]

The code snippet below shows how to loop through the list by using the FOR loop and then subset each data.frame, which seems more complicated than how it should be.

LoopFn <- function(l) {
  result <- data.frame()
  for (i in l) {
    result <- rbind(result, fn(i))
  }
  row.names(result) <- NULL
  return(result)
}
LoopFn(lst)  
#  Sepal.Length Sepal.Width Petal.Length Petal.Width   Species
#1          7.2         3.6          6.1         2.5 virginica
#2          7.7         3.8          6.7         2.2 virginica
#3          7.2         3.2          6.0         1.8 virginica
#4          7.9         3.8          6.4         2.0 virginica

Let’s take a look at two other options, both of which requires only one line as long as the setting is configured appropriately.

do.call(rbind, c(lapply(lst, fn), make.row.names = F))
#  Sepal.Length Sepal.Width Petal.Length Petal.Width   Species
#1          7.2         3.6          6.1         2.5 virginica
#2          7.7         3.8          6.7         2.2 virginica
#3          7.2         3.2          6.0         1.8 virginica
#4          7.9         3.8          6.4         2.0 virginica
do.call(rbind, c((Vectorize(fn, SIMPLIFY = F)) (lst), make.row.names = F))
#  Sepal.Length Sepal.Width Petal.Length Petal.Width   Species
#1          7.2         3.6          6.1         2.5 virginica
#2          7.7         3.8          6.7         2.2 virginica
#3          7.2         3.2          6.0         1.8 virginica
#4          7.9         3.8          6.4         2.0 virginica

Playing Map() and Reduce() in R – Subsetting

In the previous post (https://statcompute.wordpress.com/2018/09/03/playing-map-and-reduce-in-r-by-group-calculation), I’ve shown how to employ the MapReduce when calculating by-group statistics. Actually, the same Divide-n-Conquer strategy can be applicable to other use cases, one of which is the subsetting operation.

In the example below, let’s still use the same iris data for the demonstration purpose. In R, the most convenient way to perform the subsetting might be the subset() function, which would search for rows meeting the condition described in the “expr” expression below throughout the entire data.frame.

data(iris)
expr = expression(Sepal.Length > 7 & Sepal.Width > 3)
subset(iris, eval(expr))

With the whole data.frame partitioned into multiple pieces, the row searching operation can perfectly fit into the MapReduce paradigm, as described in the logic flow below.
1. First of all, the iris data is divided into chunks with equal number of rows, e.g. two chunks in the example.
2. Next, a Map() function is used to perform the row searching operation within each chunk.
3. Upon the return of rows meeting the criteria from each chunk, a Reduce() function is employed to combine all outcomes together.

n <- 2
lst <- split(iris, sort((1:nrow(iris)) %% n))
Reduce(rbind, Map(function(x) x[with(x, which(eval(expr))), ], lst))

It is noted that the above map operation is still performed sequentially without leveraging the computing power of multiple CPUs. In the CPU usage, we can see that only one CPU is used and the rest are idle.

single_core

Similar to the by-group summary, the by-chunk operation of row searching doesn’t have to be in the sequential order and can be distributed simultaneously across multiple CPUs by using the mcMap() function, as outlined below.
1. Again, it starts with the data partition. However, there are two caveats in the example. Firstly, the data is split based upon the number of CPUs captured by the detectCores() function. Secondly, the partitioned data is NOT stored physically in the memory but reflected logically by a list of future abstractions, e.g. “flst” in the code snippet.
2. In the second step, the mcMap() function is used to evaluate each future abstraction, return the partitioned data, and then perform the row searching within each chunk.
3. At last, the Reduce() function collects and combines all outcomes.

pkgs <- c("parallel", "future")
mapply(function(x) require(x, character.only = T), pkgs)
n <- detectCores()
flst <- Map(function(x) future({x}), split(iris, sort((1:nrow(iris)) %% n)))
Reduce(rbind, mcMap(function(x) value(x)[with(value(x), which(eval(expr))), ], flst, mc.cores = n))

If we take a look at the CPU usage again, it is now shown that all CPUs are utilized.

multicore

Playing Map() and Reduce() in R – By-Group Calculation

Clojure is such an interesting programming language that it can not only enhance our skill set but also change the way how we should write the program. After learning Clojure, I can’t help thinking about how to employ the functional programming and MapReduce paradigm to improve our experience with other programming languages, e.g. R in my case.

When calculating the statistical summary in R, we would go straight to aggregate() or sqldf() function without a second thought. Such by-group calculations seem so simple that we often might not bother to think about the problem itself schematically. Let’s take a look at how to approach this problem in Clojure by using the code below that I copied from https://statcompute.wordpress.com/2018/03/18/do-we-really-need-dataframe-in-clojure.

(def country_sum
  (map (fn [[billingcountry total]]
    {:billiingcountry billingcountry :total (reduce + (map :total total))})
    (group-by :billingcountry inv)))

Although the code looks a little awkward with lots of parenthesis, the idea is very clear and makes sense. We first partition the data into multiple pieces based on groups that we’d like to summarize and then define an anonymous function to sum up the invoice amount, by using a reduce() function, that we used a map() function extracting from the original data, e.g. a list of maps in this case. The whole calculation logic is a loyal reflection of MapReduce.

Now let’s come back to R and think about how to re-frame the solution for the by-group calculation. Using data(iris) as an example, we should first partition the data.frame by “species” with split() so as to convert the data.frame into a list of data.frames by groups. If I apply the class() function to each item in the list “lst1”, we should be able to see three data.frames.

data(iris)
lst1 <- split(iris, iris$Species)
Map(class, lst1)
#$setosa
#[1] "data.frame"
#$versicolor
#[1] "data.frame"
#$virginica
#[1] "data.frame"

After the data partition, we can proceed to calculate the by-group summary with each data.frame in the list. Luckily enough, because the data.frame is generically constructed as a collection of columns instead of rows, we don’t need to use the map operation to extract values from corresponding rows. Instead, we can directly calculate the column summary based on each partitioned data.frame. As shown below, the code is straightforward yet flexible given the use of an anonymous function, which can be customized to accommodate any arbitrary calculation.

Map(function(x) data.frame(grp = unique(x$Species), sl_avg = mean(x$Sepal.Length), sw_avg = mean(x$Sepal.Width)), lst1)
#$setosa
#     grp sl_avg sw_avg
#1 setosa  5.006  3.428
#$versicolor
#         grp sl_avg sw_avg
#1 versicolor  5.936   2.77
#$virginica
#        grp sl_avg sw_avg
#1 virginica  6.588  2.974

Up to now, the problem has been successfully solved. However, if we have a closer look at the solution, it doesn’t take long for us to notice that the calculation in one group is completely orthogonal to the calculation in another group and therefore the by-group calculation doesn’t have to be in a sequential order. In addition, the partitioned data consumes significantly more memory than the original one, which is not an issue for small data sets but could be a potential concern for big data sets. After all, there is no need to have the data always stored in the memory, as long as it is available for us when needed.

To address the first observation, we would bring in the parallel computation by using the parallel::mcMap() function and kicking off multiple CPUs simultaneously. For the second concern, we can introduce the concept of Future, which is the abstraction for a data.frame instead of the data.frame physically stored in the memory. The future, once created with future::future() function, would remain unresolved until we want it to be resolved in the computation by using the future::value() function, at the computing cost for evaluating the future.

With everything put together, below is the final code with the parallel map and the future abstraction.

pkg <- list("parallel", "future")
mapply(function(x) require(x, character.only = T), pkg)
ft <- future({split(iris, iris$Species)})
mcMap(function(i) with(value(ft)[[i]], data.frame(grp = unique(Species), sl_avg = mean(Sepal.Length), sw_avg = mean(Sepal.Width))), 1:length(unique(iris$Species)), mc.cores = detectCores())
#[[1]]
#     grp sl_avg sw_avg
#1 setosa  5.006  3.428
#[[2]]
#         grp sl_avg sw_avg
#1 versicolor  5.936   2.77
#[[3]]
#        grp sl_avg sw_avg
#1 virginica  6.588  2.974

If we would like the output prettier, we could wrap the list into a nice-looking data.frame with a reduce operation by either Reduce(rbind, …) or do.call(rbind, …), where … is the final list from Map() or mcMap() shown above.

Mimicking SQLDF with MonetDBLite

Like many useRs, I am also a big fan of the sqldf package developed by Grothendieck, which uses SQL statement for data frame manipulations with SQLite embedded database as the default back-end.

In examples below, I drafted a couple R utility functions with the MonetDBLite back-end by mimicking the sqldf function. There are several interesting observations shown in the benchmark comparison.
– The data import for csv data files is more efficient with MonetDBLite than with the generic read.csv function or read.csv.sql function in the sqldf package.
– The data manipulation for a single data frame, such as selection, aggregation, and subquery, is also significantly faster with MonetDBLite than with the sqldf function.
– However, the sqldf function is extremely efficient in joining 2 data frames, e.g. inner join in the example.


# IMPORT
monet.read.csv <- function(file) {
  monet.con <- DBI::dbConnect(MonetDBLite::MonetDBLite(), ":memory:")
  suppressMessages(MonetDBLite::monetdb.read.csv(monet.con, file, "file", sep = ","))
  result <- DBI::dbReadTable(monet.con, "file")
  DBI::dbDisconnect(monet.con, shutdown = T)
  return(result)  
}

microbenchmark::microbenchmark(monet = {df <- monet.read.csv("Downloads/nycflights.csv")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean   median       uq      max neval
# monet 528.5378 532.5463 539.2877 539.0902 542.4301 559.1191    10

microbenchmark::microbenchmark(read.csv = {df <- read.csv("Downloads/nycflights.csv")}, times = 10)
#Unit: seconds
#     expr      min       lq     mean   median       uq      max neval
# read.csv 2.310238 2.338134 2.360688 2.343313 2.373913 2.444814    10

# SELECTION AND AGGREGATION
monet.sql <- function(df, sql) {
  df_str <- deparse(substitute(df))
  monet.con <- DBI::dbConnect(MonetDBLite::MonetDBLite(), ":memory:")  
  suppressMessages(DBI::dbWriteTable(monet.con, df_str, df, overwrite = T))
  result <- DBI::dbGetQuery(monet.con, sql)
  DBI::dbDisconnect(monet.con, shutdown = T)
  return(result)
}

microbenchmark::microbenchmark(monet = {monet.sql(df, "select * from df sample 3")}, times = 10)
#Unit: milliseconds
#  expr     min      lq     mean   median       uq     max neval
# monet 422.761 429.428 439.0438 438.3503 447.3286 453.104    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select * from df order by RANDOM() limit 3")}, times = 10)
#Unit: milliseconds
#  expr      min      lq     mean   median       uq      max neval
# sqldf 903.9982 908.256 925.4255 920.2692 930.0934 963.6983    10

microbenchmark::microbenchmark(monet = {monet.sql(df, "select origin, median(distance) as med_dist from df group by origin")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean   median       uq      max neval
# monet 450.7862 456.9589 458.6389 458.9634 460.4402 465.2253    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select origin, median(distance) as med_dist from df group by origin")}, times = 10)
#Unit: milliseconds
#  expr      min       lq    mean   median       uq      max neval
# sqldf 833.1494 836.6816 841.952 843.5569 846.8117 851.0771    10

microbenchmark::microbenchmark(monet = {monet.sql(df, "with df1 as (select dest, avg(distance) as dist from df group by dest), df2 as (select dest, count(*) as cnts from df group by dest) select * from df1 inner join df2 on (df1.dest = df2.dest)")}, times = 10)
#Unit: milliseconds
#  expr      min       lq    mean   median       uq     max neval
# monet 426.0248 431.2086 437.634 438.4718 442.8799 451.275    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select * from (select dest, avg(distance) as dist from df group by dest) df1 inner join (select dest, count(*) as cnts from df group by dest) df2 on (df1.dest = df2.dest)")}, times = 10)
#Unit: seconds
#  expr      min       lq     mean   median       uq      max neval
# sqldf 1.013116 1.017248 1.024117 1.021555 1.025668 1.048133    10

# MERGE 
monet.sql2 <- function(df1, df2, sql) {
  df1_str <- deparse(substitute(df1))
  df2_str <- deparse(substitute(df2))
  monet.con <- DBI::dbConnect(MonetDBLite::MonetDBLite(), ":memory:")  
  suppressMessages(DBI::dbWriteTable(monet.con, df1_str, df1, overwrite = T))
  suppressMessages(DBI::dbWriteTable(monet.con, df2_str, df2, overwrite = T))
  result <- DBI::dbGetQuery(monet.con, sql)
  DBI::dbDisconnect(monet.con, shutdown = T)
  return(result)
}

tbl1 <- monet.sql(df, "select dest, avg(distance) as dist from df group by dest")
tbl2 <- monet.sql(df, "select dest, count(*) as cnts from df group by dest")

microbenchmark::microbenchmark(monet = {monet.sql2(tbl1, tbl2, "select * from tbl1 inner join tbl2 on (tbl1.dest = tbl2.dest)")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean  median       uq      max neval
# monet 93.94973 174.2211 170.7771 178.487 182.4724 187.3155    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select * from tbl1 inner join tbl2 on (tbl1.dest = tbl2.dest)")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean median       uq      max neval
# sqldf 19.49334 19.60981 20.29535 20.001 20.93383 21.51837    10

Read Random Rows from A Huge CSV File

Given R data frames stored in the memory, sometimes it is beneficial to sample and examine the data in a large-size csv file before importing into the data frame. To the best of my knowledge, there is no off-shelf R function performing such data sampling with a relatively low computing cost. Therefore, I drafted two utility functions serving this particular purpose, one with the LaF library and the other with the reticulate library by leveraging the power of Python. While the first function is more efficient and samples 3 records out of 336,776 in about 100 milliseconds, the second one is more for fun and a showcase of the reticulate package.


library(LaF)

sample1 <- function(file, n) {
  lf <- laf_open(detect_dm_csv(file, sep = ",", header = TRUE, factor_fraction = -1))
  return(read_lines(lf, sample(1:nrow(lf), n)))
}

sample1("Downloads/nycflights.csv", 3)
#   year month day dep_time dep_delay arr_time arr_delay carrier tailnum flight
# 1 2013     9  15     1323        -6     1506       -23      MQ  N857MQ   3340
# 2 2013     3  18     1657        -4     2019         9      UA  N35271     80
# 3 2013     6   7     1325        -4     1515       -11      9E  N8477R   3867
#   origin dest air_time distance hour minute
# 1    LGA  DTW       82      502   13     23
# 2    EWR  MIA      157     1085   16     57
# 3    EWR  CVG       91      569   13     25

library(reticulate)

sample2 <- function(file, n) {
  rows <- py_eval(paste("sum(1 for line in open('", file, "'))", sep = '')) - 1
  return(import("pandas")$read_csv(file, skiprows = setdiff(1:rows, sample(1:rows, n))))
}

sample2("Downloads/nycflights.csv", 3)
#   year month day dep_time dep_delay arr_time arr_delay carrier tailnum flight
# 1 2013    10   9      812        12     1010       -16      9E  N902XJ   3507
# 2 2013     4  30     1218       -10     1407       -30      EV  N18557   4091
# 3 2013     8  25     1111        -4     1238       -27      MQ  N721MQ   3281
#   origin dest air_time distance hour minute
# 1    JFK  MSY      156     1182    8     12
# 2    EWR  IND       92      645   12     18
# 3    LGA  CMH       66      479   11     11

Updating Column Values in Clojure Map


(require '[huri.core :as h]
         '[clojure.core.matrix.dataset :as d]
         '[incanter.core :as i])

(def ds [{:id 1.0 :name "name1"}
         {:id 2.0 :name "name2"}
         {:id 3.0 :name "name3"}])

;; UPDATE THE :NAME COLUMN IN THE DATASET
;; - IF THE VALUE IS NOT "NAME2", THEN CHANGE TO "NOT 2"
;;
;; EXPECTED OUTPUT:
;; | :id | :name |
;; |-----+-------|
;; | 1.0 | not 2 |
;; | 2.0 | name2 |
;; | 3.0 | not 2 |

;; WITH CLOJURE.CORE/UPDATE
(def d1 (map (fn [x] (update x :name #(if (= "name2" %) % "not 2"))) ds))

;; WITH CLOJURE.CORE/UPDATE-IN
(def d2 (map (fn [x] (update-in x [:name] #(if (= "name2" %) % "not 2"))) ds))

;; WITH HURI/UPDATE-COLS
(def d3 (h/update-cols {:name #(if (= "name2" %) % "not 2")} ds))

;; WITH MATRIX.DATASET/EMAP-COLUMN
(def d4 (-> ds
            (d/dataset)
            (d/emap-column :name #(if (= "name2" %) % "not 2"))
            ((comp #(map into %) d/row-maps))))
   
;; WITH INCANTER/TRANSFORM-COL
(def d5 (-> ds
            (i/to-dataset)
            (i/transform-col :name #(if (= "name2" %) % "not 2"))
            ((comp #(map into %) second vals))))

Adding New Columns to Clojure Map


(require '[huri.core :as h]
         '[clojure.core.matrix.dataset :as d]
         '[incanter.core :as i])

(def ds [{:id 1.0 :name "name1"}
         {:id 2.0 :name "name2"}
         {:id 3.0 :name "name3"}])

;; ADD 2 COLUMNS TO THE DATASET
;; - ADD 2 TO ID AND NAME ADD2
;; - CHECK NAME = "name2" AND NAME NAME2
;;
;; EXPECTED OUTPUT:
;;| :id | :name | :add2 | :name2 |
;;|-----+-------+-------+--------|
;;| 1.0 | name1 |   3.0 |      N |
;;| 2.0 | name2 |   4.0 |      Y |
;;| 3.0 | name3 |   5.0 |      N |

;; WITH PLAIN CLOJURE
;; #1 - MERGE
(def d1 (map #(merge % {:add2 (+ (:id %) 2) 
                        :name2 (if (= "name2" (:name %)) "Y" "N")}) ds))

;; #2 - MERGE-WITH
(def d2 (map #(merge-with into % {:add2 (+ (:id %) 2)
                                  :name2 (if (= "name2" (:name %)) "Y" "N")}) ds))

;; #3 - ASSOC
(def d3 (map #(assoc % :add2 (+ (:id %) 2) 
                       :name2 (if (= "name2" (:name %)) "Y" "N")) ds))

;; #4 - CONJ
(def d4 (map #(conj % {:add2 (+ (:id %) 2)
                       :name2 (if (= "name2" (:name %)) "Y" "N")}) ds))

;; #5 - CONCAT 
(def d5 (map #(into {} (concat % {:add2 (+ (:id %) 2)
                                  :name2 (if (= "name2" (:name %)) "Y" "N")})) ds))

;; WITH HURI 
(def d6 (h/derive-cols {:name2 [#(if (= "name2" %) "Y" "N") :name] 
                        :add2 [#(+ 2  %) :id]} ds))

;; WITH CORE.MATRIX API
(def d7 (-> ds
            (d/dataset)
            (d/add-column :add2 (map #(+ 2 %) (map :id ds)))
            (d/add-column :name2 (map #(if (= "name2" %) "Y" "N") (map :name ds)))
            (d/row-maps)))

;; WITH INCANTER API
(def d8 (->> ds
             (i/to-dataset)
             (i/add-derived-column :add2 [:id] #(+ 2 %))
             (i/add-derived-column :name2 [:name] #(if (= "name2" %) "Y" "N"))
             ((comp second vals))))

;; CHECK THE DATA EQUALITY
(= d1 d2 d3 d4 d5 d6 d7 d8)
;; true

Transpose in Clojure


(require '[huri.core :as h]
         '[clojure.core.matrix.dataset :as d]
         '[incanter.core :as i])

;; FROM MAP OF ROWS TO MAP OF COLUMNS

(def byRow [{:x 1 :y "a"}
            {:x 2 :y "b"}
            {:x 3 :y "c"}])

;; APPROACH #1 - PLAIN CLOJURE
(zipmap (keys (first byRow)) (apply map list (map vals byRow)))

; {:x (1 2 3), :y ("a" "b" "c")}

;; APPROACH #2 - HURI LIBRARY
(h/col-oriented byRow)

; {:x (1 2 3), :y ("a" "b" "c")}

;; APPROACH #3 - CORE.MATRIX LIBRARY
(d/to-map (d/dataset (keys (first byRow)) byRow))

; {:x [1 2 3], :y ["a" "b" "c"]}

;; APPROACH #4 - INCANTER LIBRARY
(i/to-map (i/to-dataset byRow))

; {:x (1 2 3), :y ("a" "b" "c")}

;; FROM MAP OF COLUMNS TO MAP OF ROWS

(def byCol {:x '(1 2 3)
            :y '("a" "b" "c")})

;; APPROACH #1 - PLAIN CLOJURE
(map #(zipmap (keys byCol) %) (apply map list (vals byCol)))

; ({:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"})

;; APPROACH #2 - HURI LIBRARY
(h/row-oriented byCol)

; ({:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"})

;; APPROACH #3 - CORE.MATRIX LIBRARY
(d/row-maps (d/dataset (keys byCol) byCol))

; [{:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"}]

;; APPROACH #4 - INCANTER LIBRARY
(second (vals (i/dataset (keys byCol) (apply map list (vals byCol)))))

; ({:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"})

Clojure Integration with R


(require '[tnoda.rashinban :as rr]
         '[tnoda.rashinban.core :as rc]
         '[clojure.core.matrix.dataset :as dt]
         '[clojure.core.matrix.impl.dataset :as id])

;; CREATE A TOY DATA
(def ds [{:id 1.0 :name "name1"}
         {:id 2.0 :name "name2"}
         {:id 3.0 :name "name3"}])

;; RUN THE FOLLOWING R CODE IN ADVANCE TO START THE RSERVE SERVER:
;;   R -e 'library(Rserve)' -e 'Rserve(args = "--vanilla")'
;; IF YOU HAVE LITTLER INSTALLED, BELOW ALSO WORKS:
;;   r -e 'library(Rserve); Rserve(args = "--vanilla")'  
(rr/init)

;; PASS THE DATA FROM CLOJURE INTO R
(map (fn [x] (rr/<- (name (key x)) (val x))) 
  (let [ks ((comp keys first) ds)] (zipmap ks (map #(map % ds) ks))))

(rr/<- 'header (map name ((comp keys first) ds)))
         
;; CREATE THE R DATA.FRAME         
(rc/eval "df = data.frame(lapply(header, as.name))")

;; TEST THE R DATA.FRAME
(rc/eval "df$id")
; [1.0 2.0 3.0]

(rc/eval "df$name")
; ["name1" "name2" "name3"]

;; CONVERT THE R DATA.FRAME BACK TO THE CLOJURE MAP
(def mp (into [] (map #(zipmap (map keyword (rr/colnames 'df)) %) 
                   (partition (count (rr/colnames 'df)) (apply interleave (rr/matrix 'df))))))

; [{:id 1.0, :name "name1"} {:id 2.0, :name "name2"} {:id 3.0, :name "name3"}]

;; TEST THE EQUALITY BETWEEN INPUT AND OUTPUT DATA
(= mp ds)
; true

;; ALTERNATIVELY, WE CAN ALSO CONVERT THE R DATA.FRAME TO A CLOJURE DATASET
(def dt (id/dataset-from-columns (map keyword (rr/colnames 'df)) (rr/matrix 'df)))

; #dataset/dataset {:column-names [:id :name], :columns [[1.0 2.0 3.0] ["name1" "name2" "name3"]], :shape [3 2]}

;; NEXT, CONVERT THE DATASET TO THE MAP
(def mp2 (dt/row-maps dt))

; [{:id 1.0, :name "name1"} {:id 2.0, :name "name2"} {:id 3.0, :name "name3"}]

(= ds mp2)
; true

Aggregation by Multiple Keys in Clojure


(require '[ultra-csv.core :refer [read-csv]]
         '[criterium.core :refer [quick-bench]]
         '[clojure.set :refer [index]])

(def ds (read-csv "/home/liuwensui/Downloads/nycflights.csv"))

;; FASTEST
(quick-bench
  (map
    (fn [x] {:year (first (key x))
             :month (last (key x))
             :flights (count (val x))})
      (group-by (juxt :year :month) ds)))      

;Evaluation count : 6 in 6 samples of 1 calls.
;             Execution time mean : 712.329182 ms
;    Execution time std-deviation : 3.832950 ms
;   Execution time lower quantile : 709.135737 ms ( 2.5%)
;   Execution time upper quantile : 718.651856 ms (97.5%)
;                   Overhead used : 11.694357 ns

;; WORKS FINE
(quick-bench
  (map
    (fn [x] {:year (:year (key x))
             :month (:month (key x))
             :flights (count (val x))})
      (group-by #(select-keys % [:year :month]) ds)))
      
;Evaluation count : 6 in 6 samples of 1 calls.
;             Execution time mean : 1.485215 sec
;    Execution time std-deviation : 9.832209 ms
;   Execution time lower quantile : 1.476116 sec ( 2.5%)
;   Execution time upper quantile : 1.500560 sec (97.5%)
;                   Overhead used : 11.694357 ns

;; SLOWEST
(quick-bench
  (map
    (fn [x] {:year (:year (key x))
             :month (:month (key x))
             :flights (count (val x))})
      (index ds [:year :month])))
      
;Evaluation count : 6 in 6 samples of 1 calls.
;             Execution time mean : 2.158245 sec
;    Execution time std-deviation : 11.208489 ms
;   Execution time lower quantile : 2.149538 sec ( 2.5%)
;   Execution time upper quantile : 2.175743 sec (97.5%)
;                   Overhead used : 11.694357 ns

Inner and Outer Joins in Clojure


(require '[clojure.pprint :refer [print-table] :rename {print-table p}]
         '[clojure.set :as s]
         '[clojure.core.reducers :as r])

;; CREATE TOY DATASETS                 
(def ds1 [{:id 1 :name "name1"}
          {:id 2 :name "name2"}
          {:id 3 :name "name3"}])
          
(def ds2 [{:id 2 :address "addr2"}
          {:id 3 :address "addr3"}
          {:id 4 :address "addr4"}])

;; GET THE HEADER
(def ks ((comp distinct flatten) (map #((comp keys first) %) [ds1 ds2])))

;; INNER JOIN WITH SET/JOIN
(p ks
  (s/join ds1 ds2))

;| :id | :name | :address |
;|-----+-------+----------|
;|   3 | name3 |    addr3 |
;|   2 | name2 |    addr2 |

;; OUTER JOIN #1
(p ks (map #(apply merge %) (vals (group-by :id (concat ds1 ds2)))))

;| :id | :name | :address |
;|-----+-------+----------|
;|   1 | name1 |          |
;|   2 | name2 |    addr2 |
;|   3 | name3 |    addr3 |
;|   4 |       |    addr4 |

;; OUTER JOIN #2 -- AN EXAMPLE OF USING REDUCERS
(p ks (into () (r/map #(r/reduce merge %) (vals (s/index (s/union ds2 ds1) [:id])))))

;| :id | :name | :address |
;|-----+-------+----------|
;|   1 | name1 |          |
;|   4 |       |    addr4 |
;|   3 | name3 |    addr3 |
;|   2 | name2 |    addr2 |
 
 ;; OUTER JOIN #3 -- USE LET CREATING LOCAL VARIABLES
(p ks (let [z1 (zipmap (map :id ds1) ds1) 
            z2 (zipmap (map :id ds2) ds2)]
        (vals (merge-with merge z1 z2))))

;| :id | :name | :address |
;|-----+-------+----------|
;|   1 | name1 |          |
;|   2 | name2 |    addr2 |
;|   3 | name3 |    addr3 |
;|   4 |       |    addr4 |

By-Group Statistical Summary in Clojure

In the previous post (https://statcompute.wordpress.com/2018/03/16/for-loop-and-map-in-clojure), I did a performance comparison between MAP and FOR loop in Clojure with a small dataset. It is interesting to see that the performance of PMAP, e.g. parallel MAP, is considerably below the performance of MAP and FOR loop, which is quite counter-intuitive.

Today, I employed a relatively large dataset with 0.3 million records to perform a by-group statistical summary. In the example, five different approaches were experimented, including MAP, PMAP, Reducer MAP, FOR loop, and LOOP/RECUR. As shown below, PMAP is at least 30% more efficient than the rest in this particular case.


(require '[clojure.pprint :as p]
         '[ultra-csv.core :as u]
         '[clj-statistics-fns.core :as s]
         '[clojure.core.reducers :as r])


(def ds (u/read-csv "/home/liuwensui/Downloads/nycflights.csv"))


;; SHOW HEADERS OF THE DATA
(prn (keys (first ds)))

; (:day :hour :tailnum :arr_time :month :dep_time :carrier :arr_delay :year :dep_delay :origin :flight :distance :air_time :dest :minute)


;; PRINT A DATA SAMPLE
(p/print-table
  (map #(select-keys % [:origin :dep_delay]) (take 3 ds)))

; | :origin | :dep_delay |
; |---------+------------|
; |     EWR |          2 |
; |     LGA |          4 |
; |     JFK |          2 |


;; APPROACH #1: MAP()
(time
  (p/print-table
    (map
      (fn [x] {:origin (first x)
               :freq (format "%,8d" (count (second x)))
               :nmiss (format "%,8d" (count (filter nil? (map #(get % :dep_delay) (second x)))))
               :med_delay (format "%,8d" (s/median (remove nil? (map #(get % :dep_delay) (second x)))))
               :75q_delay (format "%,8d" (s/kth-percentile 75 (remove nil? (map #(get % :dep_delay) (second x)))))
               :max_delay (format "%,8d" (reduce max (remove nil? (map #(get % :dep_delay) (second x)))))})
        (group-by :origin ds))))

; | :origin |    :freq |   :nmiss | :med_delay | :75q_delay | :max_delay |
; |---------+----------+----------+------------+------------+------------|
; |     EWR |  120,835 |    3,239 |         -1 |         15 |      1,126 |
; |     LGA |  104,662 |    3,153 |         -3 |          7 |        911 |
; |     JFK |  111,279 |    1,863 |         -1 |         10 |      1,301 |
; "Elapsed time: 684.71396 msecs"


;; APPROACH #2: PMAP()
(time
  (p/print-table
    (pmap
      (fn [x] {:origin (first x)
               :freq (format "%,8d" (count (second x)))
               :nmiss (format "%,8d" (count (filter nil? (map #(get % :dep_delay) (second x)))))
               :med_delay (format "%,8d" (s/median (remove nil? (map #(get % :dep_delay) (second x)))))
               :75q_delay (format "%,8d" (s/kth-percentile 75 (remove nil? (map #(get % :dep_delay) (second x)))))
               :max_delay (format "%,8d" (reduce max (remove nil? (map #(get % :dep_delay) (second x)))))})
        (group-by :origin ds))))

; | :origin |    :freq |   :nmiss | :med_delay | :75q_delay | :max_delay |
; |---------+----------+----------+------------+------------+------------|
; |     EWR |  120,835 |    3,239 |         -1 |         15 |      1,126 |
; |     LGA |  104,662 |    3,153 |         -3 |          7 |        911 |
; |     JFK |  111,279 |    1,863 |         -1 |         10 |      1,301 |
; "Elapsed time: 487.065551 msecs"


;; APPROACH #3: REDUCER MAP()
(time
  (p/print-table
    (into ()
      (r/map
        (fn [x] {:origin (first x)
                 :freq (format "%,8d" (count (second x)))
                 :nmiss (format "%,8d" (count (filter nil? (pmap #(get % :dep_delay) (second x)))))
                 :med_delay (format "%,8d" (s/median (remove nil? (pmap #(get % :dep_delay) (second x)))))
                 :75q_delay (format "%,8d" (s/kth-percentile 75 (remove nil? (pmap #(get % :dep_delay) (second x)))))
                 :max_delay (format "%,8d" (reduce max (remove nil? (pmap #(get % :dep_delay) (second x)))))})
          (group-by :origin ds)))))

; | :origin |    :freq |   :nmiss | :med_delay | :75q_delay | :max_delay |
; |---------+----------+----------+------------+------------+------------|
; |     JFK |  111,279 |    1,863 |         -1 |         10 |      1,301 |
; |     LGA |  104,662 |    3,153 |         -3 |          7 |        911 |
; |     EWR |  120,835 |    3,239 |         -1 |         15 |      1,126 |
; "Elapsed time: 3734.039994 msecs"


;; APPROACH #4: LIST COMPREHENSION
(time
  (p/print-table
    (for [g (group-by :origin ds)]
      ((fn [x] {:origin (first x)
                :freq (format "%,8d" (count (second x)))
                :nmiss (format "%,8d" (count (filter nil? (map #(get % :dep_delay) (second x)))))
                :med_delay (format "%,8d" (s/median (remove nil? (map #(get % :dep_delay) (second x)))))
                :75q_delay (format "%,8d" (s/kth-percentile 75 (remove nil? (map #(get % :dep_delay) (second x)))))
                :max_delay (format "%,8d" (reduce max (remove nil? (map #(get % :dep_delay) (second x)))))})
        g))))

; | :origin |    :freq |   :nmiss | :med_delay | :75q_delay | :max_delay |
; |---------+----------+----------+------------+------------+------------|
; |     EWR |  120,835 |    3,239 |         -1 |         15 |      1,126 |
; |     LGA |  104,662 |    3,153 |         -3 |          7 |        911 |
; |     JFK |  111,279 |    1,863 |         -1 |         10 |      1,301 |
; "Elapsed time: 692.411023 msecs"


;; APPROACH #5: LOOP/RECUR
(time
  (p/print-table
    (loop [i (group-by :origin ds) result '()]
      (if ((complement empty?) i)
        (recur
          (rest i)
          (conj result {:origin (first (first i))
                        :freq (format "%,8d" (count (second (first i))))
                        :nmiss (format "%,8d" (count (filter nil? (map #(get % :dep_delay) (second (first i))))))
                        :med_delay (format "%,8d" (s/median (remove nil? (map #(get % :dep_delay) (second (first i))))))
                        :75q_delay (format "%,8d" (s/kth-percentile 75 (remove nil? (map #(get % :dep_delay) (second (first i))))))
                        :max_delay (format "%,8d" (reduce max (remove nil? (map #(get % :dep_delay) (second (first i))))))}))
        result))))

; | :origin |    :freq |   :nmiss | :med_delay | :75q_delay | :max_delay |
; |---------+----------+----------+------------+------------+------------|
; |     JFK |  111,279 |    1,863 |         -1 |         10 |      1,301 |
; |     LGA |  104,662 |    3,153 |         -3 |          7 |        911 |
; |     EWR |  120,835 |    3,239 |         -1 |         15 |      1,126 |
; "Elapsed time: 692.717104 msecs"

Subset by Values in Clojure


(require '[clojure.pprint :as p]
         '[clojure.java.jdbc :as j])

(def db
  {:classname   "org.sqlite.JDBC"
   :subprotocol "sqlite"
   :subname     "/home/liuwensui/Downloads/chinook.db"})

(def orders (j/query db "select billingcountry as country, count(*) as orders from invoices group by billingcountry;"))

(def clist '("USA" "India" "Canada" "France"))

;; APPROACH #1: LIST COMPREHENSION
(p/print-table 
  (flatten (for [c clist] (for [o orders :when (= c (:country o))] o))))

;| :country | :orders |
;|----------+---------|
;|      USA |      91 |
;|    India |      13 |
;|   Canada |      56 |
;|   France |      35 |

;; APPROACH #2: FILTER FUNCTION
(p/print-table 
  (flatten (map (fn [c] (filter #(= (:country %) c) orders)) clist)))

;| :country | :orders |
;|----------+---------|
;|      USA |      91 |
;|    India |      13 |
;|   Canada |      56 |
;|   France |      35 |

;; APPROACH #3: SET/JOIN FUNCTION
(p/print-table 
  (clojure.set/join orders (into () (for [c clist] {:country c}))))

;| :country | :orders |
;|----------+---------|
;|      USA |      91 |
;|   France |      35 |
;|    India |      13 |
;|   Canada |      56 |

;; APPROACH #4: SET/SELECT FUNCTION
(p/print-table
   (map (fn [c] (into {} (clojure.set/select #(= (:country %) c) (set orders)))) clist))

;| :country | :orders |
;|----------+---------|
;|      USA |      91 |
;|    India |      13 |
;|   Canada |      56 |
;|   France |      35 |

;; APPROACH #5: REDUCER FUNCTIONS
(require '[clojure.core.reducers :as r])

(p/print-table 
  (into () (r/mapcat (fn [c] (r/filter #(= (:country %) c) orders)) clist)))

;| :country | :orders |
;|----------+---------|
;|   France |      35 |
;|   Canada |      56 |
;|    India |      13 |
;|      USA |      91 |

Do We Really Need Dataframe in Clojure?

During my learning of Clojure, I was deeply impressed by the richness of data structures and the built-in support for concurrency, which makes it a perfect programming language for the data science. If you simply compare between R base and Clojure core functions, Clojure wins hands down!

At the beginning, a question that I kept asking myself was why there isn’t something in Clojure similar to R data frame or Python DataFrame. I had experimented Incanter API that is currently dying and was disappointed by its performance. However, after diving deeper, I started wondering whether we really need any additional data structure, such as something similar to the dataframe, just for the purpose of data munging.

Below is an example showing how to aggregate and query data with generic Clojure data structures, e.g. lazyseq and map, and core functions. For the time being, I am somewhat convinced that the life is still good, even without dataframe.


(require '[clojure.pprint :as p]
         '[clojure.java.jdbc :as j])

(def db
  {:classname   "org.sqlite.JDBC"
   :subprotocol "sqlite"
   :subname     "/home/liuwensui/Downloads/chinook.db"})

(pprint (j/query db "select * from invoices limit 1;"))

; ({:invoiceid 1,
;  :customerid 2,
;  :invoicedate "2009-01-01 00:00:00",
;  :billingaddress "Theodor-Heuss-Straße 34",
;  :billingcity "Stuttgart",
;  :billingstate nil,
;  :billingcountry "Germany",
;  :billingpostalcode "70174",
;  :total 1.98})

(def inv (j/query db "select * from invoices;"))

;; AGGREGATE INVOICE TOTAL BY COUNTRIES
(def country_sum 
  (map (fn [[billingcountry total]]
    {:billiingcountry billingcountry :total (reduce + (map :total total))})
    (group-by :billingcountry inv)))

;; TOP 5 COUNTRIES BY INVOICE AMOUNTS 
(p/print-table (take 5 (reverse (sort-by :total country_sum))))

; | :billiingcountry |             :total |
; |------------------+--------------------|
; |              USA |  523.0600000000003 |
; |           Canada |  303.9599999999999 |
; |           France | 195.09999999999994 |
; |           Brazil | 190.09999999999997 |
; |          Germany |             156.48 |

;; SELECT ROWS BY CRITERIA, E.G. US ORDERS BETWEEN $10 AND $12
(def us_inv (filter #(and (= (:billingcountry %) "USA") (< 10 (:total %) 12)) inv))

;; LIST ORDERS MEETING CRITERIA
(pprint us_inv)

;({:invoiceid 298,
;  :customerid 17,
;  :invoicedate "2012-07-31 00:00:00",
;  :billingaddress "1 Microsoft Way",
;  :billingcity "Redmond",
;  :billingstate "WA",
;  :billingcountry "USA",
;  :billingpostalcode "98052-8300",
;  :total 10.91}
; {:invoiceid 311,
;  :customerid 28,
;  :invoicedate "2012-09-28 00:00:00",
;  :billingaddress "302 S 700 E",
;  :billingcity "Salt Lake City",
;  :billingstate "UT",
;  :billingcountry "USA",
;  :billingpostalcode "84102",
;  :total 11.94})

;; SELECT COLUMNS, E.G. STATES AND CITIES
(p/print-table (map #(select-keys % [:invoiceid :billingcountry :billingstate :billingcity]) us_inv))

; | :invoiceid | :billingcountry | :billingstate |   :billingcity |
; |------------+-----------------+---------------+----------------|
; |        298 |             USA |            WA |        Redmond |
; |        311 |             USA |            UT | Salt Lake City |

Parse CSV File with Headers in Clojure


; (defproject prj "0.1.0-SNAPSHOT"
; :dependencies [[org.clojure/clojure "1.8.0"]
; [org.clojars.bmabey/csvlib "0.3.6"]
; [ultra-csv "0.2.1"]
; [incanter "1.5.7"]
; [semantic-csv "0.2.1-alpha1"]
; [org.clojure/data.csv "0.1.4"]])

(require '[csvlib :as csvlib]
         '[ultra-csv.core :as ultra]
         '[semantic-csv.core :as semantic]
         '[clojure-csv.core :as csv]
         '[clojure.java.io :as io]
         '[incanter.core :as ic]
         '[incanter.io :as ii])

;; INCANTER PACKAGE
(time
(ic/$ (range 0 3) '(:dest :origin :hour)
(ii/read-dataset "/home/liuwensui/Downloads/nycflights.csv" :header true :delim \,)))

; "Elapsed time: 14085.382359 msecs"
; | :dest | :origin | :hour |
; |-------+---------+-------|
; | IAH | EWR | 5 |
; | IAH | LGA | 5 |
; | MIA | JFK | 5 |

;; CSVLIB PACKAGE
(time
(ic/$ (range 0 3) '(:dest :origin :hour)
(ic/to-dataset
(csvlib/read-csv "/home/liuwensui/Downloads/nycflights.csv" :headers? true))))

; "Elapsed time: 0.933955 msecs"
; | dest | origin | hour |
; |------+--------+------|
; | IAH | EWR | 5 |
; | IAH | LGA | 5 |
; | MIA | JFK | 5 |

;; ULTRA-CSV PACKAGE
(time
(ic/$ (range 0 3) '(:dest :origin :hour)
(ic/to-dataset
(ultra/read-csv "/home/liuwensui/Downloads/nycflights.csv"))))

; "Elapsed time: 30.599593 msecs"
; | :dest | :origin | :hour |
; |-------+---------+-------|
; | IAH | EWR | 5 |
; | IAH | LGA | 5 |
; | MIA | JFK | 5 |

;; SEMANTIC-CSV PACKAGE
(time
(ic/$ (range 0 3) '(:dest :origin :hour)
(ic/to-dataset
(with-open [in-file (io/reader "/home/liuwensui/Downloads/nycflights.csv")]
(->> (csv/parse-csv in-file) semantic/mappify doall)))))

; "Elapsed time: 8338.366317 msecs"
; | :dest | :origin | :hour |
; |-------+---------+-------|
; | IAH | EWR | 5 |
; | IAH | LGA | 5 |
; | MIA | JFK | 5 |

For Loop and Map in Clojure


;; DEFINE THE DATABASE
(def db
  {:classname   "org.sqlite.JDBC"
   :subprotocol "sqlite"
   :subname     "/home/liuwensui/Downloads/chinook.db"})

;; CALL PACKAGES
(require '[clojure.java.jdbc :as j]
         '[criterium.core :as c]
         '[clojure.core.reducers :as r])

;; DEFINE THE QUERY TO PULL THE TABLE LIST
(def qry (str "select tbl_name as tbl from sqlite_master where type = 'table';"))

;; PULL THE TABLE LIST AND CONVERT IT TO A LIST
(def tbl (apply list (for [i (j/query db qry)] (get i :tbl))))

;; PRINT 5 RECORDS OF THE TABLE LIST
(doseq [i (take 5 tbl)] (prn i))
; "albums"
; "sqlite_sequence"
; "artists"
; "customers"
; "employees"

;; DEFINE A FUNCTION TO COUNT RECORDS IN A TABLE
(defn cnt [x] (j/query db (str "select '" x "' as tbl, count(*) as cnt from " x ";")))

;; TEST THE FUNCTION
(cnt "albums")
; ({:tbl "albums", :cnt 347})

;; FOR LOOP
(c/quick-bench (for [i tbl] (cnt i)))
; Execution time mean : 14.156879 ns

;; MAP FUNCTION
(c/quick-bench (map cnt tbl))
; Execution time mean : 15.623790 ns

;; PMAP FUNCTION - PARALLEL MAP
(c/quick-bench (pmap cnt tbl))
; Execution time mean : 456.780027 µs

;; REDUCERS MAP FUNCTION
(defn rmap [f l] (into () (r/map f l)))
(c/quick-bench (rmap cnt tbl))
; Execution time mean : 8.376753 ms

Clojure and SQLite

First of all, we need to define the database specification.


(def db
  {:classname   "org.sqlite.JDBC"
   :subprotocol "sqlite"
   :subname     "/home/liuwensui/Downloads/chinook.db"})

We can use JDBC to query data from the table, which is the same approach mentioned in “Clojure Data Analysis Cookbook”.


;; project.clj
;; (defproject prj "0.1.0-SNAPSHOT"
;;   :dependencies [[org.clojure/clojure "1.8.0"]
;;                  [org.clojure/java.jdbc "0.7.5"]
;;                  [org.xerial/sqlite-jdbc "3.7.2"]
;; ])

(require '[clojure.pprint :as p]
         '[clojure.java.jdbc :as j])

(p/print-table (j/query db (str "select tbl_name, type from sqlite_master where type = 'table' limit 3;")))

;; |       :tbl_name | :type |
;; |-----------------+-------|
;; |          albums | table |
;; | sqlite_sequence | table |
;; |         artists | table |

Alternatively, we can also use the ClojureQL package, as shown below.


;; project.clj
;; (defproject prj "0.1.0-SNAPSHOT"
;;   :dependencies [[org.clojure/clojure "1.8.0"]
;;                  [clojureql "1.0.5"]
;;                  [org.xerial/sqlite-jdbc "3.7.2"]
;; ])

(require '[clojure.pprint :as p]
         '[clojureql.core :as l])

(p/print-table @(-> (l/select (l/table db :sqlite_master) (l/where (= :type "table")))
                    (l/take 3)
                    (l/project [:tbl_name :type])))

;; | :type |       :tbl_name |
;; |-------+-----------------|
;; | table |          albums |
;; | table | sqlite_sequence |
;; | table |         artists |

After the data import, we can easily convert it to incanter dataset or clojure map for further data munging.

R Interfaces to Python Keras Package

Keras is a popular Python package to do the prototyping for deep neural networks with multiple backends, including TensorFlow, CNTK, and Theano. Currently, there are two R interfaces that allow us to use Keras from R through the reticulate package. While the keras R package is able to provide a flexible and feature-rich API, the kerasR R package is more convenient and computationally efficient. For instance, in the below example mimicking the Python code shown in https://statcompute.wordpress.com/2017/01/02/dropout-regularization-in-deep-neural-networks, the kerasR package is at least 10% faster than the keras package in terms of the computing time.


df <- read.csv("credit_count.txt")
Y <- matrix(df[df$CARDHLDR == 1, ]$DEFAULT)
X <- scale(df[df$CARDHLDR == 1, ][3:14])
set.seed(2018)
rows <- sample(1:nrow(Y), nrow(Y) - 2000)
Y1 <- Y[rows, ]
Y2 <- Y[-rows, ]
X1 <- X[rows, ]
X2 <- X[-rows, ]

### USE KERAS PACKAGE (https://keras.rstudio.com) ###

library(keras)
dnn1 % 
  ### DEFINE THE INPUT LAYER ###
  layer_dense(units = 50, activation = 'relu', input_shape = ncol(X), kernel_constraint = constraint_maxnorm(4)) %>% 
  layer_dropout(rate = 0.2, seed = 1) %>% 
  ### DEFINE THE 1ST HIDDEN LAYER ###
  layer_dense(units = 20, activation = 'relu', kernel_constraint = constraint_maxnorm(4)) %>% 
  layer_dropout(rate = 0.2, seed = 1) %>% 
  ### DEFINE THE 2ND HIDDEN LAYER ###
  layer_dense(units = 20, activation = 'relu', kernel_constraint = constraint_maxnorm(4)) %>% 
  layer_dropout(rate = 0.2, seed = 1) %>% 
  layer_dense(units = 1, activation = 'sigmoid') %>% 
  compile(loss = 'binary_crossentropy', optimizer = 'sgd', metrics = c('accuracy'))

dnn1 %>% fit(X1, Y1, batch_size = 50, epochs = 20, verbose = 0, validation_split = 0.3)
pROC::roc(as.numeric(Y2), as.numeric(predict_proba(dnn1, X2)))

### USE KERAS PACKAGE (https://github.com/statsmaths/kerasR) ###

library(kerasR)
dnn2 <- Sequential()
### DEFINE THE INPUT LAYER ###
dnn2$add(Dense(units = 50, input_shape = ncol(X), activation = 'relu', kernel_constraint = max_norm(4)))
dnn2$add(Dropout(rate = 0.2, seed = 1))
### DEFINE THE 1ST HIDDEN LAYER ###
dnn2$add(Dense(units = 20, activation = 'relu', kernel_constraint = max_norm(4)))
dnn2$add(Dropout(rate = 0.2, seed = 1))
### DEFINE THE 2ND HIDDEN LAYER ###
dnn2$add(Dense(units = 20, activation = 'relu', kernel_constraint = max_norm(4)))
dnn2$add(Dropout(rate = 0.2, seed = 1))
dnn2$add(Dense(units = 1, activation = 'sigmoid'))
keras_compile(dnn2,  loss = 'binary_crossentropy', optimizer = 'sgd', metrics = 'accuracy')

keras_fit(dnn2, X1, Y1, batch_size = 50, epochs = 20, verbose = 0, validation_split = 0.3)
pROC::roc(as.numeric(Y2), as.numeric(keras_predict_proba(dnn2, X2)))

Query CSV Data with Apache Drill

The most interesting feature of Apache Drill (https://drill.apache.org) is the functionality of querying the raw data in original sources, e.g. json, parquet, or even csv, directly from the file system through the so-called storage plugin. In the demonstration below, I will show how to extract the data from a csv file.

To do the test drive of Apache Drill without the installation, I downloaded the docker container from https://hub.docker.com/r/harisekhon/apache-drill and then started the container by “docker run -it harisekhon/apache-drill”.

Once in the Drill shell, we can use “show schemas;” to list storage plugins that have been configured in the system. As shown below, while there are several default storage plugins, the one that we will use is “dfs” that points to the local file system.


0: jdbc:drill:zk=local> show schemas;
+---------------------+
|     SCHEMA_NAME     |
+---------------------+
| INFORMATION_SCHEMA  |
| cp.default          |
| dfs.default         |
| dfs.root            |
| dfs.tmp             |
| sys                 |
+---------------------+

We can explore file formats supported in the dfs local file system by the command “describe schema dfs;”. As shown below, in the pre-configured storage plugin, 2 entries are specifically related to the csv data format that will be used later. The entry “csv” supports data files without headers and the entry “csvh” supports data files with headers.


    "csv" : {
      "type" : "text",
      "extensions" : [ "csv" ],
      "delimiter" : ","
    },
    "csvh" : {
      "type" : "text",
      "extensions" : [ "csvh" ],
      "extractHeader" : true,
      "delimiter" : ","
    }

Next, we can use the command “show files in dfs;” to explore the file system. For instance, running the command “show files in dfs.`apache-drill/sample-data`;” will list all files in the folder “sample-data”, which includes a csv file named “nycflights.csv” that was copied from the host by “docker cp nycflights.csv 1fc4a0087a3e:/apache-drill/sample-data”, where “1fc4a0087a3e” is the container_id.

From the initial output below, it appears that there are headers in this csv file.


0: jdbc:drill:zk=local> select columns[0] as x1, columns[1] as x2 from dfs.`apache-drill/sample-data/nycflights.csv` limit 5;
+-------+--------+
|  x1   |   x2   |
+-------+--------+
| year  | month  |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
+-------+--------+

As a result, we need to change the file extension from “csv” to “csvh” in the file system and then query the data again. It turns out successful this time.


0: jdbc:drill:zk=local> select `year`, `month` from dfs.`apache-drill/sample-data/nycflights.csvh` limit 5;
+-------+--------+
| year  | month  |
+-------+--------+
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
| 2013  | 1      |
+-------+--------+

From here, we can explore the data in the raw csv file directly. For instancce, we can aggregate the number of flights by airports.


0: jdbc:drill:zk=local> select origin, count(*) as cnts from dfs.`apache-drill/sample-data/nycflights.csvh` group by origin;
+---------+---------+
| origin  |  cnts   |
+---------+---------+
| JFK     | 111279  |
| EWR     | 120835  |
| LGA     | 104662  |
+---------+---------+

Or we can extract all flights from the airport LGA and then export the data to a JSON file for the future analyses.


0: jdbc:drill:zk=local> use dfs.tmp;
+-------+--------------------------------------+
|  ok   |               summary                |
+-------+--------------------------------------+
| true  | Default schema changed to [dfs.tmp]  |
+-------+--------------------------------------+

0: jdbc:drill:zk=local> alter session set `store.format` = 'json';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+

0: jdbc:drill:zk=local> create table dfs.tmp.lga as select * from dfs.`apache-drill/sample-data/nycflights.csvh` where origin = 'LGA';
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 104662                     |
+-----------+----------------------------+

0: jdbc:drill:zk=local> show files in dfs.tmp.`lga`;
+-------------+--------------+---------+-----------+--------+--------+
|    name     | isDirectory  | isFile  |  length   | owner  | group  |       
+-------------+--------------+---------+-----------+--------+--------+
| 0_0_0.json  | false        | true    | 34156554  | root   | root   | 
+-------------+--------------+---------+-----------+--------+--------+

Sparkling Water and Moving Data Around

Sparkling Water is an application to integrate H2O with Spark. Below is an example showing how to move the data around among Pandas DataFrame, H2OFrame, and Spark Dataframe.

1. Define Context

In [1]: from pandas import read_csv, DataFrame

In [2]: from pyspark import sql

In [3]: from pysparkling import H2OContext

In [4]: from h2o import import_file, H2OFrame

In [5]: ss = sql.SparkSession.builder.getOrCreate()

In [6]: hc = H2OContext.getOrCreate(ss)

2. Convert Pandas Dataframe to H2OFrame and Spark DataFrame

In [7]: p_df = read_csv("Documents/credit_count.txt")

In [8]: type(p_df)
Out[8]: pandas.core.frame.DataFrame

In [9]: p2s_df = ss.createDataFrame(p_df)

In [10]: type(p2s_df)
Out[10]: pyspark.sql.dataframe.DataFrame

In [11]: p2h_df = H2OFrame(p_df)

In [12]: type(p2h_df)
Out[12]: h2o.frame.H2OFrame

3. Convert Spark Dataframe to H2OFrame and Pandas DataFrame

In [13]: s_df = ss.read.csv("Documents/credit_count.txt", header = True, inferSchema = True)

In [14]: type(s_df)
Out[14]: pyspark.sql.dataframe.DataFrame

In [15]: s2p_df = s_df.toPandas()

In [16]: type(s2p_df)
Out[16]: pandas.core.frame.DataFrame

In [17]: s2h_df = hc.as_h2o_frame(s_df)

In [18]: type(s2h_df)
Out[18]: h2o.frame.H2OFrame

4. Convert H2OFrame to Pandas Dataframe and Spark DataFrame

In [19]: h_df = import_file("Documents/credit_count.txt", header = 1, sep = ",")

In [20]: type(h_df)
Out[20]: h2o.frame.H2OFrame

In [21]: h2p_df = h_df.as_data_frame()

In [22]: type(h2p_df)
Out[22]: pandas.core.frame.DataFrame

In [23]: h2s_df = hc.as_spark_frame(h_df)

In [24]: type(h2s_df)
Out[24]: pyspark.sql.dataframe.DataFrame

GLM with H2O in R

Below is an example showing how to fit a Generalized Linear Model with H2O in R. The output is much more comprehensive than the one generated by the generic R glm().

> library(h2o)

> h2o.init(max_mem_size = "12g")

> df1 <- h2o.uploadFile("Documents/credit_count.txt", header = TRUE, sep = ",", parse_type = "CSV")

> df2 <- h2o.assign(df1[df1$CARDHLDR == 1, ], "glm_df")

> h2o.colnames(df2)
 [1] "CARDHLDR" "DEFAULT"  "AGE"      "ACADMOS"  "ADEPCNT"  "MAJORDRG"
 [7] "MINORDRG" "OWNRENT"  "INCOME"   "SELFEMPL" "INCPER"   "EXP_INC"
[13] "SPENDING" "LOGSPEND"

> Y <- "DEFAULT"

> X <- c("MAJORDRG", "MINORDRG", "INCOME", "OWNRENT")

> dist <- "binomial"

> link <- "logit"

> id <- "h2o_mdl01"

> mdl <- h2o.glm(X, Y, training_frame = h2o.getFrame("glm_df"), model_id = id, family = dist, link = link, lambda = 0, compute_p_values = TRUE, standardize = FALSE)

> show(h2o.getModel(id)@model$coefficients_table)
Coefficients: glm coefficients
      names coefficients std_error    z_value  p_value
1 Intercept    -1.204439  0.090811 -13.263121 0.000000
2  MAJORDRG     0.203135  0.069250   2.933370 0.003353
3  MINORDRG     0.202727  0.047971   4.226014 0.000024
4   OWNRENT    -0.201223  0.071619  -2.809636 0.004960
5    INCOME    -0.000442  0.000040 -10.942350 0.000000

> h2o.performance(h2o.getModel(id))
H2OBinomialMetrics: glm
** Reported on training data. **

MSE:  0.08414496
RMSE:  0.2900775
LogLoss:  0.3036585
Mean Per-Class Error:  0.410972
AUC:  0.6432189
Gini:  0.2864378
R^2:  0.02005004
Residual Deviance:  6376.221
AIC:  6386.221

Confusion Matrix (vertical: actual; across: predicted) for F1-optimal threshold:
          0    1    Error         Rate
0      7703 1800 0.189414   =1800/9503
1       630  366 0.632530     =630/996
Totals 8333 2166 0.231451  =2430/10499

Maximum Metrics: Maximum metrics at their respective thresholds
                        metric threshold    value idx
1                       max f1  0.126755 0.231499 142
2                       max f2  0.075073 0.376556 272
3                 max f0point5  0.138125 0.191828 115
4                 max accuracy  0.368431 0.905039   0
5                max precision  0.314224 0.250000   3
6                   max recall  0.006115 1.000000 399
7              max specificity  0.368431 0.999895   0
8             max absolute_mcc  0.126755 0.128940 142
9   max min_per_class_accuracy  0.106204 0.604546 196
10 max mean_per_class_accuracy  0.103730 0.605663 202

H2O Benchmark for CSV Import

The importFile() function in H2O is extremely efficient due to the parallel reading. The benchmark comparison below shows that it is comparable to the read.df() in SparkR and significantly faster than the generic read.csv().

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local", sparkConfig = list(spark.driver.memory = "10g", spark.driver.cores = "4"))

library(h2o)
h2o.init(max_mem_size = "10g")

library(rbenchmark)

benchmark(replications = 5, order = "elapsed", relative = "elapsed",
   csv = {
          df <- read.csv("Documents/nycflights13.csv")
          print(nrow(df))
          rm(df)
         },
   spk = {
          df <- read.df("Documents/nycflights13.csv", source = "csv", header = "true", inferSchema = "true")
          print(nrow(df))
          rm(df)
         },
   h2o = {
          df <- h2o.importFile(path = "Documents/nycflights13.csv", header = TRUE, sep = ",")
          print(nrow(df))
          rm(df)
         }
 )

#   test replications elapsed relative user.self sys.self user.child sys.child
# 3  h2o            5   8.221    1.000     0.508    0.032          0         0
# 2  spk            5   9.822    1.195     0.008    0.004          0         0
# 1  csv            5  16.595    2.019    16.420    0.176          0         0

Joining Tables in SparkR

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local")
df1 <- read.df("nycflights13.csv", source = "csv", header = "true", inferSchema = "true")

grp1 <- groupBy(filter(df1, "month in (1, 2, 3)"), "month")
sum1 <- withColumnRenamed(agg(grp1, min_dep = min(df1$dep_delay)), "month", "month1")

grp2 <- groupBy(filter(df1, "month in (2, 3, 4)"), "month")
sum2 <- withColumnRenamed(agg(grp2, max_dep = max(df1$dep_delay)), "month", "month2")

# INNER JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all = FALSE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "inner"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     3|    -25|     3|    911|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# LEFT JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all.x = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "left"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     1|    -30|  null|   null|
#|     3|    -25|     3|    911|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# RIGHT JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all.y = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "right"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     3|    -25|     3|    911|
#|  null|   null|     4|    960|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

# FULL JOIN
showDF(merge(sum1, sum2, by.x = "month1", by.y = "month2", all = TRUE))

showDF(join(sum1, sum2, sum1$month1 == sum2$month2, "full"))

#+------+-------+------+-------+
#|month1|min_dep|month2|max_dep|
#+------+-------+------+-------+
#|     1|    -30|  null|   null|
#|     3|    -25|     3|    911|
#|  null|   null|     4|    960|
#|     2|    -33|     2|    853|
#+------+-------+------+-------+

R Interface to Spark

SparkR

library(SparkR, lib.loc = paste(Sys.getenv("SPARK_HOME"), "/R/lib", sep = ""))
sc <- sparkR.session(master = "local")
df1 <- read.df("nycflights13.csv", source = "csv", header = "true", inferSchema = "true")

### SUMMARY TABLE WITH SQL
createOrReplaceTempView(df1, "tbl1")
summ <- sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month")
head(summ)
#   month  avg_dep  avg_arr
# 1     1 1347.210 1523.155
# 2     3 1359.500 1509.743
# 3     5 1351.168 1502.685

### SUMMARY TABLE WITH AGG()
grp <- groupBy(filter(df1, "month in (1, 3, 5)"), "month")
summ <- agg(grp, avg_dep = avg(df1$dep_time), avg_arr = avg(df1$arr_time))
head(summ)
#   month  avg_dep  avg_arr
# 1     1 1347.210 1523.155
# 2     3 1359.500 1509.743
# 3     5 1351.168 1502.685

sparklyr

library(sparklyr)
sc <- spark_connect(master = "local")
df1 <- spark_read_csv(sc, name = "tbl1", path = "nycflights13.csv", header = TRUE, infer_schema = TRUE)

### SUMMARY TABLE WITH SQL
library(DBI)
summ <- dbGetQuery(sc, "select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month")
head(summ)
#   month  avg_dep  avg_arr
# 1     5 1351.168 1502.685
# 2     1 1347.210 1523.155
# 3     3 1359.500 1509.743

### SUMMARY TABLE WITH DPLYR
library(dplyr)
summ <- df1 %>% 
        filter(month %in% c(1, 3, 5)) %>% 
        group_by(month) %>%
        summarize(avg_dep = mean(dep_time), avg_arr = mean(arr_time)) 
head(summ)        
#   month  avg_dep  avg_arr
#   <int>    <dbl>    <dbl>
# 1     5 1351.168 1502.685
# 2     1 1347.210 1523.155
# 3     3 1359.500 1509.743        

Random Search for Optimal Parameters

Practices of manual search, grid search, or the combination of both have been successfully employed in the machine learning to optimize hyper-parameters. However, in the arena of deep learning, both approaches might become impractical. For instance, the computing cost of grid search for hyper-parameters in a multi-layer deep neural network (DNN) could be prohibitively high.

In light of aforementioned hurdles, Bergstra and Bengio proposed a novel idea of random search in the paper http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf. In their study, it was found that random search is more efficient than grid search for the hyper-parameter optimization in terms of computing costs.

In the example below, it is shown that both grid search and random search have reached similar results in the SVM parameter optimization based on cross-validations.

import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV 
from sklearn.svm import SVC as svc 
from sklearn.metrics import make_scorer, roc_auc_score
from scipy import stats

# DATA PREPARATION
df = pd.read_csv("credit_count.txt")
y = df[df.CARDHLDR == 1].DEFAULT.values 
x = preprocessing.scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0) 

# DEFINE MODEL AND PERFORMANCE MEASURE
mdl = svc(probability = True, random_state = 1)
auc = make_scorer(roc_auc_score)

# GRID SEARCH FOR 20 COMBINATIONS OF PARAMETERS
grid_list = {"C": np.arange(2, 10, 2),
             "gamma": np.arange(0.1, 1, 0.2)}

grid_search = GridSearchCV(mdl, param_grid = grid_list, n_jobs = 4, cv = 3, scoring = auc) 
grid_search.fit(x, y) 
grid_search.cv_results_

# RANDOM SEARCH FOR 20 COMBINATIONS OF PARAMETERS
rand_list = {"C": stats.uniform(2, 10),
             "gamma": stats.uniform(0.1, 1)}
             
rand_search = RandomizedSearchCV(mdl, param_distributions = rand_list, n_iter = 20, n_jobs = 4, cv = 3, random_state = 2017, scoring = auc) 
rand_search.fit(x, y) 
rand_search.cv_results_

A Simple Convolutional Neural Network for The Binary Outcome

Since CNN(Convolutional Neural Networks) have achieved a tremendous success in various challenging applications, e.g. image or digit recognitions, one might wonder how to employ CNNs in classification problems with binary outcomes.

Below is an example showing how to use a simple 1D convolutional neural network to predict credit card defaults.

### LOAD PACKAGES 
from numpy.random import seed
from pandas import read_csv, DataFrame
from sklearn.preprocessing import minmax_scale
from keras.layers.convolutional import Conv1D, MaxPooling1D
from keras.optimizers import SGD
from keras.models import Sequential
from keras.layers import Dense, Flatten

### PREPARE THE DATA 
df = read_csv("credit_count.txt")
Y = df[df.CARDHLDR == 1].DEFAULT
X = minmax_scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0)
y_train = Y.values
x_train = X.reshape(X.shape[0], X.shape[1], 1)

### FIT A 1D CONVOLUTIONAL NEURAL NETWORK
seed(2017)
conv = Sequential()
conv.add(Conv1D(20, 4, input_shape = x_train.shape[1:3], activation = 'relu'))
conv.add(MaxPooling1D(2))
conv.add(Flatten())
conv.add(Dense(1, activation = 'sigmoid'))
sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False)
conv.compile(loss = 'binary_crossentropy', optimizer = sgd, metrics = ['accuracy'])
conv.fit(x_train, y_train, batch_size = 500, epochs = 100, verbose = 0)

Considering that 1D is the special case of 2D, we can also solve the same problem with a 2D convolutional neural network by changing the input shape, as shown below.

from numpy.random import seed
from pandas import read_csv, DataFrame
from sklearn.preprocessing import minmax_scale
from keras_diagram import ascii
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras.optimizers import SGD
from keras.models import Sequential
from keras.layers import Dense, Flatten

df = read_csv("credit_count.txt")
Y = df[df.CARDHLDR == 1].DEFAULT
X = minmax_scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0)
y_train = Y.values
x_train = X.reshape(X.shape[0], 1, X.shape[1], 1)

seed(2017)
conv = Sequential()
conv.add(Conv2D(20, (1, 4), input_shape = x_train.shape[1:4], activation = 'relu'))
conv.add(MaxPooling2D((1, 2)))
conv.add(Flatten())
conv.add(Dense(1, activation = 'sigmoid'))
sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False)
conv.compile(loss = 'binary_crossentropy', optimizer = sgd, metrics = ['accuracy'])
conv.fit(x_train, y_train, batch_size = 500, epochs = 100, verbose = 0)

Autoencoder for Dimensionality Reduction

We often use ICA or PCA to extract features from the high-dimensional data. The autoencoder is another interesting algorithm to achieve the same purpose in the context of Deep Learning.

With the purpose of learning a function to approximate the input data itself such that F(X) = X, an autoencoder consists of two parts, namely encoder and decoder. While the encoder aims to compress the original input data into a low-dimensional representation, the decoder tries to reconstruct the original input data based on the low-dimension representation generated by the encoder. As a result, the autoencoder has been widely used to remove the data noise as well to reduce the data dimension.

First of all, we will show the basic structure of an autoencoder with 1-layer encoder and 1-layer decoder, as below. In the example, we will compress the input data with 10 columns into a compressed on with 3 columns.

from pandas import read_csv, DataFrame
from numpy.random import seed
from sklearn.preprocessing import minmax_scale
from sklearn.model_selection import train_test_split
from keras.layers import Input, Dense
from keras.models import Model

df = read_csv("credit_count.txt")
Y = df[df.CARDHLDR == 1].DEFAULTS
X = df[df.CARDHLDR == 1].ix[:, 2:12]
# SCALE EACH FEATURE INTO [0, 1] RANGE
sX = minmax_scale(X, axis = 0)
ncol = sX.shape[1]
X_train, X_test, Y_train, Y_test = train_test_split(sX, Y, train_size = 0.5, random_state = seed(2017))

### AN EXAMPLE OF SIMPLE AUTOENCODER ###
# InputLayer (None, 10)
#      Dense (None, 5)
#      Dense (None, 10)

input_dim = Input(shape = (ncol, ))
# DEFINE THE DIMENSION OF ENCODER ASSUMED 3
encoding_dim = 3
# DEFINE THE ENCODER LAYER
encoded = Dense(encoding_dim, activation = 'relu')(input_dim)
# DEFINE THE DECODER LAYER
decoded = Dense(ncol, activation = 'sigmoid')(encoded)
# COMBINE ENCODER AND DECODER INTO AN AUTOENCODER MODEL
autoencoder = Model(input = input_dim, output = decoded)
# CONFIGURE AND TRAIN THE AUTOENCODER
autoencoder.compile(optimizer = 'adadelta', loss = 'binary_crossentropy')
autoencoder.fit(X_train, X_train, nb_epoch = 50, batch_size = 100, shuffle = True, validation_data = (X_test, X_test))
# THE ENCODER TO EXTRACT THE REDUCED DIMENSION FROM THE ABOVE AUTOENCODER
encoder = Model(input = input_dim, output = encoded)
encoded_input = Input(shape = (encoding_dim, ))
encoded_out = encoder.predict(X_test)
encoded_out[0:2]
#array([[ 0.        ,  1.26510417,  1.62803197],
#       [ 2.32508397,  0.99735016,  2.06461048]], dtype=float32)

In the next example, we will relax the constraint of layers and employ a stack of layers to achievement the same purpose as above.

### AN EXAMPLE OF DEEP AUTOENCODER WITH MULTIPLE LAYERS
# InputLayer (None, 10)
#      Dense (None, 20)
#      Dense (None, 10)
#      Dense (None, 5)
#      Dense (None, 3)
#      Dense (None, 5)
#      Dense (None, 10)
#      Dense (None, 20)
#      Dense (None, 10)

input_dim = Input(shape = (ncol, ))
# DEFINE THE DIMENSION OF ENCODER ASSUMED 3
encoding_dim = 3
# DEFINE THE ENCODER LAYERS
encoded1 = Dense(20, activation = 'relu')(input_dim)
encoded2 = Dense(10, activation = 'relu')(encoded1)
encoded3 = Dense(5, activation = 'relu')(encoded2)
encoded4 = Dense(encoding_dim, activation = 'relu')(encoded3)
# DEFINE THE DECODER LAYERS
decoded1 = Dense(5, activation = 'relu')(encoded4)
decoded2 = Dense(10, activation = 'relu')(decoded1)
decoded3 = Dense(20, activation = 'relu')(decoded2)
decoded4 = Dense(ncol, activation = 'sigmoid')(decoded3)
# COMBINE ENCODER AND DECODER INTO AN AUTOENCODER MODEL
autoencoder = Model(input = input_dim, output = decoded4)
# CONFIGURE AND TRAIN THE AUTOENCODER
autoencoder.compile(optimizer = 'adadelta', loss = 'binary_crossentropy')
autoencoder.fit(X_train, X_train, nb_epoch = 100, batch_size = 100, shuffle = True, validation_data = (X_test, X_test))
# THE ENCODER TO EXTRACT THE REDUCED DIMENSION FROM THE ABOVE AUTOENCODER
encoder = Model(input = input_dim, output = encoded4)
encoded_input = Input(shape = (encoding_dim, ))
encoded_out = encoder.predict(X_test)
encoded_out[0:2]
#array([[ 3.74947715,  0.        ,  3.22947764],
#       [ 3.93903661,  0.17448257,  1.86618853]], dtype=float32)

Dropout Regularization in Deep Neural Networks

The deep neural network (DNN) is a very powerful neural work with multiple hidden layers and is able to capture the highly complex relationship between the response and predictors. However, it is prone to the over-fitting due to a large number of parameters that makes the regularization crucial for DNNs. In the paper (https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf), an interesting regularization approach, e.g. dropout, was proposed with a simple and elegant idea. Basically, it suppresses the complexity of DNNs by randomly dropping units in both input and hidden layers.

Below is an example showing how to tune the hyper-parameter of dropout rates with Keras library in Python. Because of the long computing time required by the dropout, the parallelism is used to speed up the process.

from pandas import read_csv, DataFrame
from numpy.random import seed
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score 
from keras.models import Sequential
from keras.constraints import maxnorm
from keras.optimizers import SGD
from keras.layers import Dense, Dropout
from multiprocessing import Pool, cpu_count
from itertools import product
from parmap import starmap

df = read_csv("credit_count.txt")
Y = df[df.CARDHLDR == 1].DEFAULT
X = df[df.CARDHLDR == 1][['AGE', 'ADEPCNT', 'MAJORDRG', 'MINORDRG', 'INCOME', 'OWNRENT', 'SELFEMPL']]
sX = scale(X)
ncol = sX.shape[1]
x_train, x_test, y_train, y_test = train_test_split(sX, Y, train_size = 0.5, random_state = seed(2017))

def tune_dropout(rate1, rate2):
  net = Sequential()
  ## DROPOUT AT THE INPUT LAYER
  net.add(Dropout(rate1, input_shape = (ncol,)))
  ## DROPOUT AT THE 1ST HIDDEN LAYER
  net.add(Dense(ncol, init = 'normal', activation = 'relu', W_constraint = maxnorm(4)))
  net.add(Dropout(rate2))
  ## DROPOUT AT THE 2ND HIDDER LAYER
  net.add(Dense(ncol, init = 'normal', activation = 'relu', W_constraint = maxnorm(4)))
  net.add(Dropout(rate2))
  net.add(Dense(1, init = 'normal', activation = 'sigmoid'))
  sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False)
  net.compile(loss='binary_crossentropy', optimizer = sgd, metrics = ['accuracy'])
  net.fit(x_train, y_train, batch_size = 200, nb_epoch = 50, verbose = 0)
  print rate1, rate2, "{:6.4f}".format(roc_auc_score(y_test, net.predict(x_test)))

input_dp = [0.1, 0.2, 0.3]
hidden_dp = [0.2, 0.3, 0.4, 0.5]
parms = [i for i in product(input_dp, hidden_dp)]

seed(2017)
starmap(tune_dropout, parms, pool = Pool(processes = cpu_count()))

As shown in the output below, the optimal dropout rate appears to be 0.2 incidentally for both input and hidden layers.

0.1 0.2 0.6354
0.1 0.4 0.6336
0.1 0.3 0.6389
0.1 0.5 0.6378
0.2 0.2 0.6419
0.2 0.4 0.6385
0.2 0.3 0.6366
0.2 0.5 0.6359
0.3 0.4 0.6313
0.3 0.2 0.6350
0.3 0.3 0.6346
0.3 0.5 0.6343

Fastest Way to Add New Variables to A Large Data.Frame

pkgs <- list("hflights", "doParallel", "foreach", "dplyr", "rbenchmark", "data.table")
lapply(pkgs, require, character.only = T)

data(hflights)

benchmark(replications = 10, order = "user.self", relative = "user.self",
  transform = {
    ### THE GENERIC FUNCTION MODIFYING THE DATA.FRAME, SIMILAR TO DATA.FRAME() ###
    transform(hflights, wday = ifelse(DayOfWeek %in% c(6, 7), 'weekend', 'weekday'), delay = ArrDelay + DepDelay)
  },
  within    = {
    ### EVALUATE THE EXPRESSION WITHIN THE LOCAL ENVIRONMENT ###
    within(hflights, {wday = ifelse(DayOfWeek %in% c(6, 7), 'weekend', 'weekday'); delay = ArrDelay + DepDelay})
  },
  mutate   = {
    ### THE SPECIFIC FUNCTION IN DPLYR PACKAGE TO ADD VARIABLES ###
    mutate(hflights, wday = ifelse(DayOfWeek %in% c(6, 7), 'weekend', 'weekday'), delay = ArrDelay + DepDelay)
  },
  foreach = {
    ### SPLIT AND THEN COMBINE IN PARALLEL ###
    registerDoParallel(cores = 2)
    v <- c(names(hflights), 'wday', 'delay')
    f <- expression(ifelse(hflights$DayOfWeek %in% c(6, 7), 'weekend', 'weekday'),
                    hflights$ArrDelay + hflights$DepDelay)
    df <- foreach(fn = iter(f), .combine = mutate, .init = hflights) %dopar% {
      eval(fn)
    }
    names(df) <- v
  },
  data.table = {
    ### DATA.TABLE ###
    data.table(hflights)[, c("wday", "delay") := list(ifelse(hflights$DayOfWeek %in% c(6, 7), 'weekend', 'weekday'), hflights$ArrDelay + hflights$DepDelay)]
  }
)

#         test replications elapsed relative user.self sys.self user.child
# 4    foreach           10   1.442    1.000     0.240    0.144      0.848
# 2     within           10   0.667    2.783     0.668    0.000      0.000
# 3     mutate           10   0.679    2.833     0.680    0.000      0.000
# 5 data.table           10   0.955    3.983     0.956    0.000      0.000
# 1  transform           10   1.732    7.200     1.728    0.000      0.000

Flavors of SQL on Pandas DataFrame

In R, sqldf() provides a convenient interface of running SQL statement on data frames. Similarly, Python also offers multiple ways to interact between SQL and Pandas DataFrames by leveraging the lightweight SQLite engine. While pandasql (https://github.com/yhat/pandasql) works similarly to sqldf() in R, pysqldf (https://github.com/airtoxin/pysqldf) is even more powerful. In my experiments shown below, advantages of pysqldf over pandasql are two-fold. First of all, pysqldf is 2 – 3 times faster than pandasql. Secondly, pysqldf supports new function definitions, which is not available in pandasql. However, it is worth mentioning that the generic python interface to an in-memory SQLite database can be more efficient and flexible than both pysqldf and pandasql, as demonstrated below, as long as we are able to get the DataFrame into the SQLite and let it stay in-memory.

from sqlite3 import connect
from pandas import read_sql_query
import pandasql
import pysqldf
import numpy

# CREATE AN IN-MEMORY SQLITE DB
con = connect(":memory:")
cur = con.cursor()
cur.execute("attach 'my.db' as filedb")
cur.execute("create table df as select * from filedb.hflights")
cur.execute("detach filedb")

# IMPORT SQLITE TABLE INTO PANDAS DF
df = read_sql_query("select * from df", con)

# WRITE QUERIES
sql01 = "select * from df where DayofWeek = 1 and Dest = 'CVG';"
sql02 = "select DayofWeek, AVG(ArrTime) from df group by DayofWeek;"
sql03 = "select DayofWeek, median(ArrTime) from df group by DayofWeek;"

# SELECTION:
# 1. PANDASQL
%time t11 = pandasql.sqldf(sql01, globals())
# 2. PYSQLDF
%time t12 = pysqldf.SQLDF(globals()).execute(sql01)
# 3. GENERIC SQLITE CONNECTION
%time t13 = read_sql_query(sql01, con)

# AGGREGATION:
# 1. PANDASQL
%time t21 = pandasql.sqldf(sql02, globals())
# 2. PYSQLDF
%time t22 = pysqldf.SQLDF(globals()).execute(sql02)
# 3. GENERIC SQLITE CONNECTION
%time t23 = read_sql_query(sql02, con)

# DEFINING A NEW FUNCTION:
# DEFINE A FUNCTION NOT SUPPORTED IN SQLITE
class median(object):
  def __init__(self):
    self.a = []
  def step(self, x):
    self.a.append(x)
  def finalize(self):
    return numpy.median(self.a)

# 1. PYSQLDF
udafs = {"median": median}
%time t31 = pysqldf.SQLDF(globals(), udafs = udafs).execute(sql03)
# 2 GENERIC SQLITE CONNECTION
con.create_aggregate("median", 1, median)
%time t32 = read_sql_query(sql03, con)

SAS Macro Calculating Mutual Information

In statistics, various correlation functions, either Spearman or Pearson, have been used to measure the dependence between two data vectors under the linear or monotonic assumption. Mutual Information (MI) is an alternative widely used in Information Theory and is considered a more general measurement of the dependence between two vectors. More specifically, MI quantifies how much information two vectors, regardless of their actual values, might share based on their joint and marginal probability distribution functions.

Below is a sas macro implementing MI and Normalized MI by mimicking functions in Python, e.g. mutual_info_score() and normalized_mutual_info_score(). Although MI is used to evaluate the cluster analysis performance in sklearn package, it can also be used as an useful tool for Feature Selection in the context of Machine Learning and Statistical Modeling.

%macro mutual(data = , x = , y = );
***********************************************************;
* SAS MACRO CALCULATING MUTUAL INFORMATION AND ITS        *;
* NORMALIZED VARIANT BETWEEN TWO VECTORS BY MIMICKING     *;
* SKLEARN.METRICS.NORMALIZED_MUTUAL_INFO_SCORE()          *;
* SKLEARN.METRICS.MUTUAL_INFO_SCORE() IN PYTHON           *;
* ======================================================= *;
* INPUT PAREMETERS:                                       *;
*  DATA : INPUT SAS DATA TABLE                            *;
*  X    : FIRST INPUT VECTOR                              *;
*  Y    : SECOND INPUT VECTOR                             *;
* ======================================================= *;
* AUTHOR: WENSUI.LIU@53.COM                               *;
***********************************************************;

data _1;
  set &data;
  where &x ~= . and &y ~= .;
  _id = _n_;
run;

proc sql;
create table
  _2 as
select
  _id,
  &x,
  &y,
  1 / (select count(*) from _1) as _p_xy
from
  _1;

create table
  _3 as
select
  _id,
  &x         as _x,
  sum(_p_xy) as _p_x,
  sum(_p_xy) * log(sum(_p_xy)) / count(*) as _h_x
from 
  _2
group by
  &x;

create table
  _4 as
select
  _id,
  &y         as _y,
  sum(_p_xy) as _p_y,
  sum(_p_xy) * log(sum(_p_xy)) / count(*) as _h_y
from 
  _2
group by
  &y;

create table
  _5 as
select
  a.*,
  b._p_x,
  b._h_x,
  c._p_y,
  c._h_y,
  a._p_xy * log(a._p_xy / (b._p_x * c._p_y)) as mutual
from
  _2 as a, _3 as b, _4 as c
where
  a._id = b._id and a._id = c._id;

select
  sum(mutual) as MI format = 12.8,
  case 
    when sum(mutual) = 0 then 0
    else sum(mutual) / (sum(_h_x) * sum(_h_y)) ** 0.5 
  end as NMI format = 12.8
from
  _5;
quit;

%mend mutual;

Risk Models with Generalized PLS

While developing risk models with hundreds of potential variables, we often run into the situation that risk characteristics or macro-economic indicators are highly correlated, namely multicollinearity. In such cases, we might have to drop variables with high VIFs or employ “variable shrinkage” methods, e.g. lasso or ridge, to suppress variables with colinearity.

Feature extraction approaches based on PCA and PLS have been widely discussed but are rarely used in real-world applications due to concerns around model interpretability and implementation. In the example below, it is shown that there shouldn’t any hurdle in the model implementation, e.g. score, given that coefficients can be extracted from a GPLS model in the similar way from a GLM model. In addition, compared with GLM with 8 variables, GPLS with only 5 components is able to provide a comparable performance in the hold-out testing data.

R Code

library(gpls)
library(pROC)

df1 <- read.csv("credit_count.txt")
df2 <- df1[df1$CARDHLDR == 1, -c(1, 10, 11, 12, 13)]
set.seed(2016)
n <- nrow(df2)
sample <- sample(seq(n), size = n / 2, replace = FALSE)
train <- df2[sample, ]
test <- df2[-sample, ]

m1 <- glm(DEFAULT ~ ., data = train, family = "binomial")
cat("\n### ROC OF GLM PREDICTION WITH TRAINING DATA ###\n")
print(roc(train$DEFAULT, predict(m1, newdata = train, type = "response")))
cat("\n### ROC OF GLM PREDICTION WITH TESTING DATA ###\n")
print(roc(test$DEFAULT, predict(m1, newdata = test, type = "response")))

m2 <- gpls(DEFAULT ~ ., data = train, family = "binomial", K.prov = 5)
cat("\n### ROC OF GPLS PREDICTION WITH TRAINING DATA ###\n")
print(roc(train$DEFAULT, predict(m2, newdata = train)$predicted[, 1]))
cat("\n### ROC OF GPLS PREDICTION WITH TESTING DATA ###\n")
print(roc(test$DEFAULT, predict(m2, newdata = test)$predicted[, 1]))

cat("\n### COEFFICIENTS COMPARISON BETWEEN GLM AND GPLS ###\n")
print(data.frame(glm = m1$coefficients, gpls = m2$coefficients))

Output

### ROC OF GLM PREDICTION WITH TRAINING DATA ###

Call:
roc.default(response = train$DEFAULT, predictor = predict(m1,     newdata = train, type = "response"))

Data: predict(m1, newdata = train, type = "response") in 4753 controls (train$DEFAULT 0) < 496 cases (train$DEFAULT 1).
Area under the curve: 0.6641

### ROC OF GLM PREDICTION WITH TESTING DATA ###

Call:
roc.default(response = test$DEFAULT, predictor = predict(m1,     newdata = test, type = "response"))

Data: predict(m1, newdata = test, type = "response") in 4750 controls (test$DEFAULT 0) < 500 cases (test$DEFAULT 1).
Area under the curve: 0.6537

### ROC OF GPLS PREDICTION WITH TRAINING DATA ###

Call:
roc.default(response = train$DEFAULT, predictor = predict(m2,     newdata = train)$predicted[, 1])

Data: predict(m2, newdata = train)$predicted[, 1] in 4753 controls (train$DEFAULT 0) < 496 cases (train$DEFAULT 1).
Area under the curve: 0.6627

### ROC OF GPLS PREDICTION WITH TESTING DATA ###

Call:
roc.default(response = test$DEFAULT, predictor = predict(m2,     newdata = test)$predicted[, 1])

Data: predict(m2, newdata = test)$predicted[, 1] in 4750 controls (test$DEFAULT 0) < 500 cases (test$DEFAULT 1).
Area under the curve: 0.6542

### COEFFICIENTS COMPARISON BETWEEN GLM AND GPLS ###
                      glm          gpls
(Intercept) -0.1940785071 -0.1954618828
AGE         -0.0122709412 -0.0147883358
ACADMOS      0.0005302022  0.0003671781
ADEPCNT      0.1090667092  0.1352491711
MAJORDRG     0.0757313171  0.0813835741
MINORDRG     0.2621574192  0.2547176301
OWNRENT     -0.2803919685 -0.1032119571
INCOME      -0.0004222914 -0.0004531543
LOGSPEND    -0.1688395555 -0.1525963363

Python Prototype of Grid Search for SVM Parameters

from itertools import product
from pandas import read_table, DataFrame
from sklearn.cross_validation import KFold as kfold
from sklearn.svm import SVC as svc
from sklearn.metrics import roc_auc_score as auc

df = read_table('credit_count.txt', sep = ',')
Y = df[df.CARDHLDR == 1].DEFAULT
X = df[df.CARDHLDR == 1][['AGE', 'ADEPCNT', 'MAJORDRG', 'MINORDRG', 'INCOME', 'OWNRENT', 'SELFEMPL']]

c = [1, 10]
g = [0.01, 0.001]
parms = [i for i in product(c, g)]
kf = [i for i in kfold(Y.count(), n_folds = 3, shuffle = True, random_state = 0)]
final = DataFrame()

for i in parms:
  result = DataFrame()	
  mdl = svc(C = i[0], gamma = i[1], probability = True, random_state = 0)
  for j in kf:
    X1 = X.iloc[j[0]]
    Y1 = Y.iloc[j[0]]
    X2 = X.iloc[j[1]]
    Y2 = Y.iloc[j[1]]
    mdl.fit(X1, Y1)
    pred = mdl.predict_proba(X2)[:, 1]
    out = DataFrame({'pred': pred, 'y': Y2})
    result = result.append(out)
  perf = DataFrame({'Cost': i[0], 'Gamma': i[1], 'AUC': [auc(result.y, result.pred)]}) 
  final = final.append(perf)

Where Bagging Might Work Better Than Boosting

In the previous post (https://statcompute.wordpress.com/2016/01/01/the-power-of-decision-stumps), it was shown that the boosting algorithm performs extremely well even with a simple 1-level stump as the base learner and provides a better performance lift than the bagging algorithm does. However, this observation shouldn’t be generalized, which would be demonstrated in the following example.

First of all, we developed a rule-based PART model as below. Albeit pruned, this model will still tend to over-fit the data, as shown in the highlighted.

# R = TRUE AND N = 10 FOR 10-FOLD CV PRUNING
# M = 5 SPECIFYING MINIMUM NUMBER OF CASES PER LEAF
part_control <- Weka_control(R = TRUE, N = 10, M = 5, Q = 2016)
part <- PART(fml, data = df, control = part_control)
roc(as.factor(train$DEFAULT), predict(part, newdata = train, type = "probability")[, 2])
# Area under the curve: 0.6839
roc(as.factor(test$DEFAULT), predict(part, newdata = test, type = "probability")[, 2])
# Area under the curve: 0.6082

Next, we applied the boosting to the PART model. As shown in the highlighted result below, AUC of the boosting on the testing data is even lower than AUC of the base model.

wlist <- list(PART, R = TRUE, N = 10, M = 5, Q = 2016)
# I = 100 SPECIFYING NUMBER OF ITERATIONS
# Q = TRUE SPECIFYING RESAMPLING USED IN THE BOOSTING
boost_control <- Weka_control(I = 100, S = 2016, Q = TRUE, P = 100, W = wlist)
boosting <- AdaBoostM1(fml, data = train, control = boost_control)
roc(as.factor(test$DEFAULT), predict(boosting, newdata = test, type = "probability")[, 2])
# Area under the curve: 0.592

However, if employing the bagging, we are able to achieve more than 11% performance lift in terms of AUC.

# NUM-SLOTS = 0 AND I = 100 FOR PARALLELISM 
# P = 50 SPECIFYING THE SIZE OF EACH BAG
bag_control <- Weka_control("num-slots" = 0, I = 100, S = 2016, P = 50, W = wlist)
bagging <- Bagging(fml, data = train, control = bag_control)
roc(as.factor(test$DEFAULT), predict(bagging, newdata = test, type = "probability")[, 2])
# Area under the curve: 0.6778

From examples demonstrated today and yesterday, an important lesson to learn is that ensemble methods are powerful machine learning tools only when they are used appropriately. Empirically speaking, while the boosting works well to improve the performance of a under-fitted base model such as the decision stump, the bagging might be able to perform better in the case of an over-fitted base model with high variance and low bias.

The Power of Decision Stumps

A decision stump is the weak classification model with the simple tree structure consisting of one split, which can also be considered a one-level decision tree. Due to its simplicity, the stump often demonstrates a low predictive performance. As shown in the example below, the AUC measure of a stump is even lower than the one of a single attribute in a separate testing dataset.

pkgs <- c('pROC', 'RWeka')
lapply(pkgs, require, character.only = T)
df1 <- read.csv("credit_count.txt")
df2 <- df1[df1$CARDHLDR == 1, ]
set.seed(2016)
n <- nrow(df2)
sample <- sample(seq(n), size = n / 2, replace = FALSE)
train <- df2[sample, ]
test <- df2[-sample, ]
x <- paste("AGE + ACADMOS + ADEPCNT + MAJORDRG + MINORDRG + OWNRENT + INCOME + SELFEMPL + INCPER + EXP_INC")
fml <- as.formula(paste("as.factor(DEFAULT) ~ ", x))

### IDENTIFY THE MOST PREDICTIVE ATTRIBUTE ###
imp <- InfoGainAttributeEval(fml, data = train)
imp_x <- test[, names(imp[imp == max(imp)])]
roc(as.factor(test$DEFAULT), imp_x)
# Area under the curve: 0.6243

### CONSTRUCT A WEAK CLASSIFIER OF DECISION STUMP ###
stump <- DecisionStump(fml, data = train)
print(stump)
roc(as.factor(test$DEFAULT), predict(stump, newdata = test, type = "probability")[, 2])
# Area under the curve: 0.5953

Albeit weak by itself, the decision stump can be used as a base model in many machine learning ensemble methods, such as bagging and boosting. For instance, the bagging classifier with 1,000 stumps combined outperforms the single stump by ~7% in terms of AUC (0.6346 vs. 0.5953). Moreover, AdaBoost with stumps can further improve the predictive performance over the single stump by ~11% (0.6585 vs. 0.5953) and also over the logistic regression benchmark by ~2% (0.6585 vs. 0.6473).

### BUILD A BAGGING CLASSIFIER WITH 1,000 STUMPS IN PARALLEL ###
bagging <- Bagging(fml, data = train, control = Weka_control("num-slots" = 0, I = 1000, W = "DecisionStump", S = 2016, P = 50))
roc(as.factor(test$DEFAULT), predict(bagging, newdata = test, type = "probability")[, 2])
# Area under the curve: 0.6346

### BUILD A BOOSTING CLASSIFIER WITH STUMPS ###
boosting <- AdaBoostM1(fml, data = train, control = Weka_control(I = 100, W = "DecisionStump", S = 2016))
roc(as.factor(test$DEFAULT), predict(boosting, newdata = test, type = "probability")[, 2])
# Area under the curve: 0.6585
 
### DEVELOP A LOGIT MODEL FOR THE BENCHMARK ###
logit <- Logistic(fml, data = train)
roc(as.factor(test$DEFAULT), predict(logit, newdata = test, type = "probability")[, 2])
# Area under the curve: 0.6473

Parallelize Map()

Map() is a convenient routine in Python to apply a function to all items from one or more lists, as shown below. This specific nature also makes map() a perfect candidate for the parallelism.

In [1]: a = (1, 2, 3)

In [2]: b = (10, 20, 30)

In [3]: def func(a, b):
...:      print "a -->", a, "b -->", b
...:

In [4]: ### SERIAL CALL ###

In [5]: map(func, a, b)
a --> 1 b --> 10
a --> 2 b --> 20
a --> 3 b --> 30

Pool.map() function in Multiprocessing Package is the parallel implementation of map(). However, a drawback is that Pool.map() doesn’t support more than one arguments in the function call. Therefore, in case of a functional call with multiple arguments, a wrapper function is necessary to make it working, which however should be defined before importing Multiprocessing package.

In [6]: ### PARALLEL CALL ###

In [7]: ### SINCE POOL.MAP() DOESN'T TAKE MULTIPLE ARGUMENTS, A WRAPPER IS NEEDED

In [8]: def f2(ab):
...:      a, b = ab
...:      return func(a, b)
...:

In [9]: from multiprocessing import Pool, cpu_count

In [10]: pool = Pool(processes = cpu_count())

In [11]: ### PARALLEL MAP() ON ALL CPUS

In [12]: pool.map(f2, zip(a, b))
a --> 1 b --> 10
a --> 2 b --> 20
a --> 3 b --> 30

In addition, Pool.apply() function, with some tweaks, can also be employed to mimic the parallel version of map(). The advantage of this approach is that, different from Pool.map(), Pool.apply() is able to handle multiple arguments by using the list comprehension.

In [13]: ### POOL.APPLY() CAN ALSO MIMIC MAP()

In [14]: [pool.apply(func, args = (i, j)) for i, j in zip(a, b)]
a --> 1 b --> 10
a --> 2 b --> 20
a --> 3 b --> 30

Alternatively, starmap() function in the parmap package (https://github.com/zeehio/parmap), which is specifically designed to overcome limitations in Pool.map(), provides a more friendly and elegant interface to implement the parallelized map() with multiple arguments at the cost of a slight computing overhead.

In [15]: ### ALTERNATIVELY, PARMAP PACKAGE IS USED

In [16]: from parmap import starmap

In [17]: starmap(func, zip(a, b), pool = Pool(processes = cpu_count()))
a --> 1 b --> 10
a --> 2 b --> 20
a --> 3 b --> 30

Parallelism with Joblib Package in Python

In the previous post (https://statcompute.wordpress.com/2015/12/27/import-csv-by-chunk-simultaneously-with-ipython-parallel), we’ve shown how to implement the parallelism with IPython parallel package. However, in that specific case, we were not able to observe the efficiency gain of parallelism.

In the example below, we tested Joblib package to implement the parallelism with Multiprocessing package as the back-end and searched for the optimal free parameter in a GRNN that has been demonstrated in (https://statcompute.wordpress.com/2015/12/09/fitting-generalized-regression-neural-network-with-python). As shown in the comparison below, CPU time of the parallel implementation with Joblib package is significantly shorter than CPU time of the serial implementation with map() function.

In [1]: import pandas as pd

In [2]: import numpy as np

In [3]: from sklearn import preprocessing, cross_validation

In [4]: from neupy.algorithms import GRNN as grnn

In [5]: from neupy.functions import mse

In [6]: from joblib import Parallel, delayed

In [7]: df = pd.read_table("csdata.txt")

In [8]: y = df.ix[:, 0]

In [9]: x = df.ix[:, 1:df.shape[1]]

In [10]: st_x = preprocessing.scale(x)

In [11]: x_train, x_test, y_train, y_test = cross_validation.train_test_split(st_x, y, train_size = 0.6, random_state = 2016)

In [12]: def try_std(x):
   ....:       nn = grnn(std = x, verbose = False)
   ....:       nn.train(x_train, y_train)
   ....:       y_pred = nn.predict(x_test)
   ....:       print x, "-->", "{:10.8f}".format(mse(y_pred, y_test))
   ....:     

In [13]: ### SERIAL IMPLEMENTATION ###

In [14]: %time map(try_std, np.linspace(0.5, 2.0, 16))
0.5 --> 0.03598864
0.6 --> 0.03387313
0.7 --> 0.03260287
0.8 --> 0.03188978
0.9 --> 0.03151914
1.0 --> 0.03134342
1.1 --> 0.03128110
1.2 --> 0.03129023
1.3 --> 0.03134819
1.4 --> 0.03143958
1.5 --> 0.03155242
1.6 --> 0.03167701
1.7 --> 0.03180485
1.8 --> 0.03192895
1.9 --> 0.03204561
2.0 --> 0.03215511
CPU times: user 7.15 s, sys: 11.8 s, total: 18.9 s
Wall time: 5.94 s

In [15]: ### PARALLEL IMPLEMENTATION ###

In [16]: %time Parallel(n_jobs = 8)(delayed(try_std)(i) for i in np.linspace(0.5, 2.0, 16))
0.5 --> 0.03598864
0.9 --> 0.03151914
0.6 --> 0.03387313
0.7 --> 0.03260287
1.2 --> 0.03129023
1.1 --> 0.03128110
0.8 --> 0.03188978
1.0 --> 0.03134342
1.3 --> 0.03134819
1.6 --> 0.03167701
1.8 --> 0.03192895
1.5 --> 0.03155242
1.9 --> 0.03204561
1.4 --> 0.03143958
1.7 --> 0.03180485
2.0 --> 0.03215511
CPU times: user 60.9 ms, sys: 270 ms, total: 331 ms
Wall time: 2.87 s

Import CSV by Chunk Simultaneously with IPython Parallel

IPython Parallel is a friendly interface to implement the parallelism in Python. Below is an example showing how to import a csv file into the Pandas DataFrame by chunk simultaneously through the interface of IPython Parallel.

In [1]: ### NEED TO RUN "IPCLUSTER START -N 2 &" FIRST ###

In [2]: import pandas as pd

In [3]: import ipyparallel as ip

In [4]: str = "AutoCollision.csv"

In [5]: rc = ip.Client()

In [6]: ### showing 2 engines working

In [7]: rc.ids
Out[7]: [0, 1]

In [8]: def para_read(file):
   ...:       dview = rc[:]
   ...:       # PARTITION THE IMPORT BY SCATTER() #
   ...:       dview.scatter("df", pd.read_csv(file))
   ...:       return pd.concat([i for i in dview["df"]])
   ...:

In [9]: ### PARALLEL IMPORT ###

In [10]: df1 = para_read(str)

In [11]: ### SERIAL IMPORT ###

In [12]: df2 = pd.read_csv(str)

In [13]: df1.equals(df2)
Out[13]: True

Calculate Leave-One-Out Prediction for GLM

In the model development, the “leave-one-out” prediction is a way of cross-validation, calculated as below:
1. First of all, after a model is developed, each observation used in the model development is removed in turn and then the model is refitted with the remaining observations
2. The out-of-sample prediction for the refitted model is calculated with the removed observation one by one to assemble the LOO, e.g. leave-one-out predicted values for the whole model development sample.
The loo_predict() function below is a general routine to calculate the LOO prediction for any GLM object, which can be further employed to investigate the model stability and predictability.

> pkgs <- c('doParallel', 'foreach')
> lapply(pkgs, require, character.only = T)
[[1]]
[1] TRUE

[[2]]
[1] TRUE

> registerDoParallel(cores = 8)
>
> data(AutoCollision, package = "insuranceData")
> # A GAMMA GLM #
> model1 <- glm(Severity ~ Age + Vehicle_Use, data = AutoCollision, family = Gamma(link = "log"))
> # A POISSON GLM #
> model2 <- glm(Claim_Count ~ Age + Vehicle_Use, data = AutoCollision, family = poisson(link = "log"))
>
> loo_predict <- function(obj) {
+   yhat <- foreach(i = 1:nrow(obj$data), .combine = rbind) %dopar% {
+     predict(update(obj, data = obj$data[-i, ]), obj$data[i,], type = "response")
+   }
+   return(data.frame(result = yhat[, 1], row.names = NULL))
+ }
> # TEST CASE 1
> test1 <- loo_predict(model1)
> test1$result
 [1] 303.7393 328.7292 422.6610 375.5023 240.9785 227.6365 288.4404 446.5589
 [9] 213.9368 244.7808 278.7786 443.2256 213.9262 243.2495 266.9166 409.2565
[17] 175.0334 172.0683 197.2911 326.5685 187.2529 215.9931 249.9765 349.3873
[25] 190.1174 218.6321 243.7073 359.9631 192.3655 215.5986 233.1570 348.2781
> # TEST CASE 2
> test2 <- loo_predict(model2)
> test2$result
 [1]  11.15897  37.67273  28.76127  11.54825  50.26364 152.35489 122.23782
 [8]  44.57048 129.58158 465.84173 260.48114 107.23832 167.40672 510.41127
[15] 316.50765 121.75804 172.56928 546.25390 341.03826 134.04303 359.30141
[22] 977.29107 641.69934 251.32547 248.79229 684.86851 574.13994 238.42209
[29] 148.77733 504.12221 422.75047 167.61203

Multivariate Adaptive Regression Splines with Python

In [1]: import statsmodels.datasets as datasets

In [2]: import sklearn.metrics as metrics

In [3]: from numpy import log

In [4]: from pyearth import Earth as earth

In [5]: boston = datasets.get_rdataset("Boston", "MASS").data

In [6]: x = boston.ix[:, 0:boston.shape[1] - 1]

In [7]: xlabel = list(x.columns)

In [8]: y = boston.ix[:, boston.shape[1] - 1]

In [9]: model = earth(enable_pruning = True, penalty = 3, minspan_alpha = 0.05, endspan_alpha = 0.05)

In [10]: model.fit(x, log(y), xlabels = xlabel)
Out[10]:
Earth(allow_linear=None, check_every=None, enable_pruning=True, endspan=None,
   endspan_alpha=0.05, max_degree=None, max_terms=None,
   min_search_points=None, minspan=None, minspan_alpha=0.05, penalty=3,
   smooth=None, thresh=None)

In [11]: print model.summary()
Earth Model
--------------------------------------
Basis Function   Pruned  Coefficient
--------------------------------------
(Intercept)      No      2.93569
h(lstat-5.9)     No      -0.0249319
h(5.9-lstat)     No      0.067697
h(rm-6.402)      No      0.230063
h(6.402-rm)      Yes     None
crim             Yes     None
h(dis-1.5004)    No      -0.0369272
h(1.5004-dis)    No      1.70517
h(ptratio-18.6)  No      -0.0393493
h(18.6-ptratio)  No      0.0278253
nox              No      -0.767146
h(indus-25.65)   No      -0.147471
h(25.65-indus)   Yes     None
h(black-395.24)  Yes     None
h(395.24-black)  Yes     None
h(crim-24.8017)  Yes     None
h(24.8017-crim)  No      0.0292657
rad              No      0.00807783
h(rm-7.853)      Yes     None
h(7.853-rm)      Yes     None
chas             Yes     None
--------------------------------------
MSE: 0.0265, GCV: 0.0320, RSQ: 0.8409, GRSQ: 0.8090

In [12]: metrics.r2_score(log(y), model.predict(x))
Out[12]: 0.840861083407211

Fitting Generalized Regression Neural Network with Python

In [1]: # LOAD PACKAGES

In [2]: import pandas as pd

In [3]: import numpy as np

In [4]: from sklearn import preprocessing as pp

In [5]: from sklearn import cross_validation as cv

In [6]: from neupy.algorithms import GRNN as grnn

In [7]: from neupy.functions import mse

In [8]: # DATA PROCESSING

In [9]: df = pd.read_table("csdata.txt")

In [10]: y = df.ix[:, 0]

In [11]: y.describe()
Out[11]:
count    4421.000000
mean        0.090832
std         0.193872
min         0.000000
25%         0.000000
50%         0.000000
75%         0.011689
max         0.998372
Name: LEV_LT3, dtype: float64

In [12]: x = df.ix[:, 1:df.shape[1]]

In [13]: st_x = pp.scale(x)

In [14]: st_x.mean(axis = 0)
Out[14]:
array([  1.88343648e-17,   5.76080438e-17,  -1.76540780e-16,
        -7.71455583e-17,  -3.80705294e-17,   3.79409490e-15,
         4.99487355e-17,  -2.97100804e-15,   3.93261537e-15,
        -8.70310886e-16,  -1.30728071e-15])

In [15]: st_x.std(axis = 0)
Out[15]: array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.])

In [16]: x_train, x_test, y_train, y_test = cv.train_test_split(st_x, y, train_size = 0.7, random_state = 2015)

In [17]: # TRAIN THE NEURAL NETWORK

In [18]: def try_std(x):
   ....:       nn = grnn(std = x, verbose = False)
   ....:       nn.train(x_train, y_train)
   ....:       y_pred = nn.predict(x_test)
   ....:       print mse(y_pred, y_test)
   ....:

In [19]: # TEST A LIST OF VALUES FOR THE TUNING PARAMETER

In [20]: for x in np.linspace(0.5, 1.5, 11):
   ....:       print x
   ....:       try_std(x)
   ....:
0.5
0.034597892756
0.6
0.0331189699098
0.7
0.0323384657283
0.8
0.0319580849146
0.9
0.0318001764256
1.0
0.031751821704
1.1
0.031766356369
1.2
0.03183082142
1.3
0.0319348198865
1.4
0.0320623872248
1.5
0.03219800235

Ibis – A New Kid in Town

Developed by Wes McKinney, pandas is a very efficient and powerful data analysis tool in python language for data scientists. Same as R, pandas reads the data into memory. As a result, we might often face the problem of running out of memory while analyzing large-size data with pandas.

Similar to Blaze, ibis is a new data analysis framework in python built on top of other back-end data engines, such as sqlite and impala. Even better, ibis provides a higher compatibility to pandas and better performance than Blaze.

In a previous blog (https://statcompute.wordpress.com/2015/03/27/a-comparison-between-blaze-and-pandas), I’ve shown the efficiency of Blaze through a simple example. However, in the demonstration below, it is shown that, while applied to the same data with sqlite engine, ibis is 50% more efficient than Blaze in terms of the “real time”.

import ibis as ibis

tbl = ibis.sqlite.connect('//home/liuwensui/Documents/data/flights.db').table('tbl2008')
exp = tbl[tbl.DayOfWeek > 1].group_by("DayOfWeek").aggregate(avg_AirTime = tbl.AirTime.mean())
pd = exp.execute()
print(pd)

#i   DayOfWeek  avg_AirTime
#0          2   103.214930
#1          3   103.058508
#2          4   103.467138
#3          5   103.557539
#4          6   107.400631
#5          7   104.864885
#
#real   0m10.346s
#user   0m9.585s
#sys    0m1.181s

Efficiency of Concatenating Two Large Tables in SAS

In SAS, there are 4 approaches to concatenate two datasets together:
1. SET statement,
2. APPEND procedure (or APPEND statement in DATASETS procedure)
3. UNION ALL in SQL procedure,
4. INSERT INTO in SQL procedure.
Below is an efficiency comparison among these 4 approaches, showing that APPEND procedure, followed by SET statement, is the most efficient in terms of CPU time.

data one two;
  do i = 1 to 2000000;
    x = put(i, best12.);
    if i le 1000 then output one;
    else output two;
  end;
run;

data test1;
  set one two;
run;

data test2;
  set one;
run;

proc append base = test2 data = two;
run;

proc sql;
  create table
    test3 as
  select
    *
  from
    one

  union all

  select
    *
  from
    two;
quit;

data test4;
  set one;
run;

proc sql;
  insert into
    test4
  select
    *
  from
    two;
quit;

Query Pandas DataFrame with SQL

Similar to SQLDF package providing a seamless interface between SQL statement and R data.frame, PANDASQL allows python users to use SQL querying Pandas DataFrames.

Below are some examples showing how to use PANDASQL to do SELECT / AGGREGATE / JOIN operations. More information is also available on the GitHub (https://github.com/yhat/pandasql).

In [1]: import sas7bdat as sas

In [2]: import pandas as pd

In [3]: import pandasql as pdsql

In [4]: data = sas.SAS7BDAT("accepts.sas7bdat")

In [5]: df = data.toDataFrame()

In [6]: pysql = lambda q: pdsql.sqldf(q, globals())

In [7]: ### SELECT ###

In [8]: str1 = "select bureau_score, ltv from df where bureau_score < 600 and ltv > 100 limit 3;"

In [9]: df1 = pysql(str1)

In [10]: df1
Out[10]: 
   bureau_score  ltv
0           590  103
1           575  120
2           538  113

In [11]: ### AGGREGATE ###

In [12]: str2 = "select ltv, min(bureau_score) as min_score, max(bureau_score) as max_score from df group by ltv order by ltv DESC;"

In [13]: df2 = pysql(str2);

In [14]: df2.head(3)
Out[14]: 
   ltv  min_score  max_score
0  176        709        709
1  168        723        723
2  167        688        688

In [15]: ### JOIN ###

In [16]: str3 = "select b.*, a.bureau_score from df a inner join df2 b on a.ltv = b.ltv order by ltv DESC;"

In [17]: df3 = pysql(str3)

In [18]: df3.head(3)
Out[18]: 
   ltv  min_score  max_score  bureau_score
0  176        709        709           709
1  168        723        723           723
2  167        688        688           688

By-Group Aggregation in Parallel

Similar to the row search, by-group aggregation is another perfect use case to demonstrate the power of split-and-conquer with parallelism.

In the example below, it is shown that the homebrew by-group aggregation with foreach pakage, albeit inefficiently coded, is still a lot faster than the summarize() function in Hmisc package.

load('2008.Rdata')

pkgs <- c('rbenchmark', 'doParallel', 'foreach', 'Hmisc')
lapply(pkgs, require, character.only = T)
registerDoParallel(cores = 8)

benchmark(replications = 10,
  summarize = {
    summarize(data[c("Distance", "Month")], data["Month"], colMeans, stat.name = NULL)
  },
  foreach = {
    data2 <- split(data, data$Month)
    test2 <- foreach(i = data2, .combine = rbind) %dopar% (data.frame(Month = unique(i$Month), Distance= mean(i$Distance)))
  }
)

#        test replications elapsed relative user.self sys.self user.child
# 2   foreach           10  19.644     1.00    17.411    1.965      1.528
# 1 summarize           10  30.244     1.54    29.822    0.441      0.000

Vector Search vs. Binary Search

# REFERENCE:
# user2014.stat.ucla.edu/files/tutorial_Matt.pdf

pkgs <- c('data.table', 'rbenchmark')
lapply(pkgs, require, character.only = T)
 
load('2008.Rdata')
dt <- data.table(data)

benchmark(replications = 10, order = "elapsed",
  vector_search = {
    test1 <- dt[ArrTime == 1500 & Origin == 'ABE', ] 
  },
  binary_search = {
    setkey(dt, ArrTime, Origin)
    test2 <- dt[.(1500, 'ABE'), ]
  }
)

#            test replications elapsed relative user.self sys.self user.child
# 2 binary_search           10   0.335    1.000     0.311    0.023          0
# 1 vector_search           10   7.245   21.627     7.102    0.131          0

Row Search in Parallel

I’ve been always wondering whether the efficiency of row search can be improved if the whole data.frame is splitted into chunks and then the row search is conducted within each chunk in parallel.

In the R code below, a comparison is done between the standard row search and the parallel row search with the FOREACH package. The result is very encouraging. For 10 replications, the elapsed time of parallel search is only the fraction of the elapsed time of standard search.

load('2008.Rdata')
data2 <- split(data, 1:8)

library(rbenchmark)
library(doParallel)
registerDoParallel(cores = 8)
library(foreach)

benchmark(replications = 10, order = "elapsed",
  non_parallel = {
    test1 <- data[which(data$ArrTime == 1500 & data$Origin == 'ABE'), ]
  },
  parallel = {
    test2 <- foreach(i = data2, .combine = rbind) %dopar% i[which(i$ArrTime == 1500 & i$Origin == 'ABE'), ]
  }
)
#           test replications elapsed relative user.self sys.self user.child
# 2     parallel           10   2.680    1.000     0.319    0.762     12.078
# 1 non_parallel           10   7.474    2.789     7.339    0.139      0.000

Chain Operations: An Interesting Feature in dplyr Package


library(data.table)
library(dplyr)

data1 <- fread('/home/liuwensui/Downloads/2008.csv', header = T, sep = ',')
dim(data1)
# [1] 7009728      29

data2 <- data1 %.%
           filter(Year = 2008, Month %in% c(1, 2, 3, 4, 5, 6)) %.%
           select(Year, Month, AirTime) %.%
           group_by(Year, Month) %.%
           summarize(avg_time = mean(AirTime, na.rm = TRUE)) %.%
           arrange(desc(avg_time))

print(data2)
#   Year Month avg_time
# 1 2008     3 106.1939
# 2 2008     2 105.3185
# 3 2008     6 104.7604
# 4 2008     1 104.6181
# 5 2008     5 104.3720
# 6 2008     4 104.2694

Learning Pig Latin on 2014-03-01

grunt> sh head -4 2008.csv;
Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,754,735,1002,1000,WN,3231,N772SW,128,145,113,2,19,IAD,TPA,810,5,10,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,628,620,804,750,WN,448,N428WN,96,90,76,14,8,IND,BWI,515,3,17,0,,0,NA,NA,NA,NA,NA

grunt> sh sed '1d' 2008.csv > 2008.csv2;

grunt> sh head -4 2008.csv2;            
2008,1,3,4,2003,1955,2211,2225,WN,335,N712SW,128,150,116,-14,8,IAD,TPA,810,4,8,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,754,735,1002,1000,WN,3231,N772SW,128,145,113,2,19,IAD,TPA,810,5,10,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,628,620,804,750,WN,448,N428WN,96,90,76,14,8,IND,BWI,515,3,17,0,,0,NA,NA,NA,NA,NA
2008,1,3,4,926,930,1054,1100,WN,1746,N612SW,88,90,78,-6,-4,IND,BWI,515,3,7,0,,0,NA,NA,NA,NA,NA

grunt> A = LOAD '2008.csv2' USING PigStorage(',');                

grunt> B = GROUP A BY ($0, $1); 

grunt> C = FOREACH B GENERATE group, COUNT(A);

grunt> D = FILTER C BY $0.$1 IN (1, 2, 3);

grunt> SPLIT D INTO D1 IF $0.$1 == 1, D2 IF $0.$1 == 2, D3 IF $0.$1 == 3;

grunt> dump D3;
((2008,3),616090)

My First Learning Session in Pig Latin

-- load data
A = load '/home/liuwensui/Documents/data/test_pig.txt' using PigStorage(',') as (id: chararray, x: int, y: float);
dump A;

/*
(A,1,1.1)
(A,2,2.2)
(A,2,3.3)
(B,1,1.1)
(B,1,2.2)
*/

-- group data
B = group A by id;
dump B;

/*
(A,{(A,1,1.1),(A,2,2.2),(A,2,3.3)})
(B,{(B,1,1.1),(B,1,2.2)})
*/

-- summarize data by group
C1 = foreach B generate group as id1, MAX(A.x) as max_x;
dump C1;

/*
(A,2)
(B,1)
*/

C2 = foreach B generate group as id2, MIN(A.y) as min_y;
dump C2;

/*
(A,1.1)
(B,1.1)
*/

-- select data by criterion
C3 = filter C2 by id2 == 'A';
dump C3;

/*
(A,1.1)
*/

-- inner join
C4 = join C1 by id1, C3 by id2;
dump C4;

/*
(A,2,A,1.1)
*/

-- full join
C5 = join C1 by id1 full, C3 by id2;
dump C5;

/*
(A,2,A,1.1)
(B,1,,)
*/

-- union
C6 = union C4, C5;
dump C6;

/*
(A,2,A,1.1)
(B,1,,)
(A,2,A,1.1)
*/

Efficiency of Importing Large CSV Files in R

### size of csv file: 689.4MB (7,009,728 rows * 29 columns) ###

system.time(read.csv('../data/2008.csv', header = T))
#   user  system elapsed 
# 88.301   2.416  90.716

library(data.table)
system.time(fread('../data/2008.csv', header = T, sep = ',')) 
#   user  system elapsed 
#  4.740   0.048   4.785

library(bigmemory)
system.time(read.big.matrix('../data/2008.csv', header = T))
#   user  system elapsed 
# 59.544   0.764  60.308

library(ff)
system.time(read.csv.ffdf(file = '../data/2008.csv', header = T))
#   user  system elapsed 
# 60.028   1.280  61.335 

library(sqldf)
system.time(read.csv.sql('../data/2008.csv'))
#   user  system elapsed 
# 87.461   3.880  91.447

Julia and SQLite

Similar to R and Pandas in Python, Julia provides a simple yet efficient interface with SQLite database. In addition, it is extremely handy to use sqldf() function, which is almost identical to the sqldf package in R, in SQLite package for data munging.

julia> # LOADING SQLITE PACKAGE

julia> using SQLite

julia> # CONNECT TO THE SQLITE DB FILE 

julia> db = SQLite.connect("/home/liuwensui/Documents/db/sqlitedb/csdata.db")

julia> # SHOW TABLES IN THE DB 

julia> query("select name from sqlite_master where type = 'table'")
1x1 DataFrame
|-------|-----------|
| Row # | name      |
| 1     | tblcsdata |

julia> # PULL DATA FROM THE TABLE

julia> # THE DATA WOULD BE AUTOMATICALLY SAVED AS A DATAFRAME

julia> df1 = query("select * from tblcsdata");

julia> head(df1, 2)
6x12 DataFrame
|-------|---------|----------|-----------|---------|-----------|----------|-----|-----------|-------|-------|-------|-------|
| Row # | LEV_LT3 | TAX_NDEB | COLLAT1   | SIZE1   | PROF2     | GROWTH2  | AGE | LIQ       | IND2A | IND3A | IND4A | IND5A |
| 1     | 0.0     | 0.530298 | 0.0791719 | 13.132  | 0.0820164 | 1.16649  | 53  | 0.385779  | 0     | 0     | 1     | 0     |
| 2     | 0.0     | 0.370025 | 0.0407454 | 12.1326 | 0.0826154 | 11.092   | 54  | 0.224123  | 1     | 0     | 0     | 0     |

julia> # SELECT DATA FROM THE TABLE WITH SQLDF() FUNCTION 

julia> df2 = sqldf("select * from df1 where AGE between 25 and 30");

julia> # SUMMARIZE DATA WITH SQLDF() FUNCTION 

julia> df3 = sqldf("select age, avg(LEV_LT3) as avg_lev from df2 group by age")
6x2 DataFrame
|-------|-----|-----------|
| Row # | AGE | avg_lev   |
| 1     | 25  | 0.0923202 |
| 2     | 26  | 0.0915009 |
| 3     | 27  | 0.0579876 |
| 4     | 28  | 0.104191  |
| 5     | 29  | 0.0764582 |
| 6     | 30  | 0.0806471 |

Test Drive of Julia

   _       _ _(_)_     |  A fresh approach to technical computing
  (_)     | (_) (_)    |  Documentation: http://docs.julialang.org
   _ _   _| |_  __ _   |  Type "help()" to list help topics
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 0.3.0-prerelease+490 (2013-12-15 07:16 UTC)
 _/ |\__'_|_|_|\__'_|  |  Commit f8f3190* (0 days old master)
|__/                   |  x86_64-linux-gnu

julia> # load the package

julia> using DataFrames

julia> # read a txt file into dataframe

julia> df1 = readtable("credit_count.txt");

julia> # subset the dataframe

julia> df2 = df1[:(CARDHLDR .== 1), ["DEFAULT", "MAJORDRG", "MINORDRG"]];

julia> # aggregate the data

julia> df3 = by(df2, "DEFAULT", :(MAJOR_DRG = mean(_DF["MAJORDRG"])))
2x2 DataFrame:
        DEFAULT MAJOR_DRG
[1,]          0  0.139851
[2,]          1  0.175703


julia> df4 = by(df2, "DEFAULT", :(MINOR_DRG = mean(_DF["MINORDRG"])))
2x2 DataFrame:
        DEFAULT MINOR_DRG
[1,]          0  0.213196
[2,]          1  0.292169


julia> # join two dataframes

julia> df5 = join(df3, df4, on = "DEFAULT", kind = :inner)
2x3 DataFrame:
        DEFAULT MAJOR_DRG MINOR_DRG
[1,]          0  0.139851  0.213196
[2,]          1  0.175703  0.292169

Faster Random Sampling with Replacement in SAS

Most SAS users like to use SURVEYSELECT procedures to do the random sampling. However, when it comes to the big dataset, the efficiency of SURVEYSELECT procedure seems pretty low. As a result, I normally like to use data step to do the sampling.

While the simple random sample without replacement is trivial and can be easily accomplished by generating a random number with the uniform distribution, the random sample with replacement doesn’t seem straightforward with the data step. In the demo below, I will show how to do sampling with replacement by both SURVEYSELECT and data step and compare their efficiencies.

First of all, I will artificially simulate a data set with 10 million rows.

data one;
  do i = 1 to 10000000;
  output;
  end;
run;

Secondly, I will wrap SURVEYSELECT procedure into a macro to do sampling with replacement. with this method, it took more than 20 seconds CPU time to get the work done even after subtracting ~1 second simulation time.

%macro urs1(indata = , outdata = );
options mprint;

proc sql noprint;
  select put(count(*), 10.) into :n from &indata;
quit;

proc surveyselect data = &indata out = &outdata n = &n method = urs seed = 2013;
run;

proc freq data = &outdata;
  tables numberhits;
run;

%mend urs1;

%urs1(indata = one, outdata = two);
/*
      real time           30.32 seconds
      cpu time            22.54 seconds

                                       Cumulative    Cumulative
NumberHits    Frequency     Percent     Frequency      Percent
---------------------------------------------------------------
         1     3686249       58.25       3686249        58.25
         2     1843585       29.13       5529834        87.38
         3      611396        9.66       6141230        97.04
         4      151910        2.40       6293140        99.44
         5       30159        0.48       6323299        99.91
         6        4763        0.08       6328062        99.99
         7         641        0.01       6328703       100.00
         8          98        0.00       6328801       100.00
         9          11        0.00       6328812       100.00
        10           1        0.00       6328813       100.00
*/

At last, let’s take a look at how to accomplish the same task with a simple data step. The real trick here is to understand the statistical nature of a Poisson distribution. As shown below, while delivering a very similar result, this approach only consumes roughly a quarter of the CPU time. This efficiency gain would be particularly more attractive when we need to apply complex machine learning algorithms, e.g. bagging, to big data problems.

%macro urs2(indata = , outdata = );
options mprint;

data &outdata;
  set &indata;
  numberhits = ranpoi(2013, 1);
  if numberhits > 0 then output;
run;

proc freq data = &outdata;
  tables numberhits;
run;

%mend urs2;

%urs2(indata = one, outdata = two);
/*
      real time           13.42 seconds
      cpu time            6.60 seconds

                                       Cumulative    Cumulative
numberhits    Frequency     Percent     Frequency      Percent
---------------------------------------------------------------
         1     3677134       58.18       3677134        58.18
         2     1840742       29.13       5517876        87.31
         3      612487        9.69       6130363        97.00
         4      152895        2.42       6283258        99.42
         5       30643        0.48       6313901        99.90
         6        5180        0.08       6319081        99.99
         7         732        0.01       6319813       100.00
         8          92        0.00       6319905       100.00
         9          12        0.00       6319917       100.00
        10           2        0.00       6319919       100.00
*/

Test Drive of PyPy

PyPy (www.pypy.org) is a python interpreter alternative to CPython. Today, I did a test drive with PyPy. The python code below is to generate 1,000,000 rows of data and then calculate the aggregated summation of each category.

import random  as random
import pprint  as pprint

random.seed(2013)
list = [[random.randint(1, 2), random.randint(1, 3), random.uniform(-1, 1)] for i in xrange(1, 1000001)]

summ = []
 
for i in set(x[0] for x in list):
  for j in set(x[1] for x in list):
    total = sum([x[2] for x in list if x[0] == i and x[1] == j]) 
    summ.append([i, j, round(total, 2)])

pprint.pprint(summ)

First of all, I run it with CPython and get the follow outcome.

liuwensui@ubuntu64:~/Documents/code$ time python test_pypy.py 
[[1, 1, 258.53],
 [1, 2, -209.49],
 [1, 3, -225.78],
 [2, 1, 202.2],
 [2, 2, 58.63],
 [2, 3, 188.99]]

real    0m4.491s
user    0m4.448s
sys     0m0.040s

However, if I run it with PyPy, the run time is only about one third of the one with CPython interpreter.

liuwensui@ubuntu64:~/Documents/code$ time pypy test_pypy.py 
[[1, 1, 258.53],
 [1, 2, -209.49],
 [1, 3, -225.78],
 [2, 1, 202.2],
 [2, 2, 58.63],
 [2, 3, 188.99]]

real    0m1.351s
user    0m1.228s
sys     0m0.120s

Prototyping A General Regression Neural Network with SAS

Last time when I read the paper “A General Regression Neural Network” by Donald Specht, it was exactly 10 years ago when I was in the graduate school. After reading again this week, I decided to code it out with SAS macros and make this excellent idea available for the SAS community.

The prototype of GRNN consists of 2 SAS macros, %grnn_learn() for the training of a GRNN and %grnn_pred() for the prediction with a GRNN. The famous Boston Housing dataset is used to test these two macros with the result compared with the outcome from the R implementation below. In this exercise, it is assumed that the smoothing parameter SIGMA is known and equal to 0.55 in order to simplify the case.

pkgs <- c('MASS', 'doParallel', 'foreach', 'grnn')
lapply(pkgs, require, character.only = T)
registerDoParallel(cores = 8)

data(Boston)
X <- Boston[-14]
st.X <- scale(X)
Y <- Boston[14]
boston <- data.frame(st.X, Y)

pred_grnn <- function(x, nn){
  xlst <- split(x, 1:nrow(x))
  pred <- foreach(i = xlst, .combine = rbind) %dopar% {
    data.frame(pred = guess(nn, as.matrix(i)), i, row.names = NULL)
  }
}

grnn <- smooth(learn(boston, variable.column = ncol(boston)), sigma = 0.55)
pred_grnn <- pred_grnn(boston[, -ncol(boston)], grnn)
head(pred_grnn$pred, n = 10)
# [1] 24.61559 23.22232 32.29610 32.57700 33.29552 26.73482 21.46017 20.96827
# [9] 16.55537 20.25247

The first SAS macro to train a GRNN is %grnn_learn() shown below. The purpose of this macro is store the whole specification of a GRNN in a SAS dataset after the simple 1-pass training with the development data. Please note that motivated by the idea of MongoDB, I use the key-value paired scheme to store the information of a GRNN.

libname data '';

data data.boston;
  infile 'housing.data';
  input x1 - x13 y;
run;

%macro grnn_learn(data = , x = , y = , sigma = , nn_out = );
options mprint mlogic nocenter;
********************************************************;
* THIS MACRO IS TO TRAIN A GENERAL REGRESSION NEURAL   *;
* NETWORK (SPECHT, 1991) AND STORE THE SPECIFICATION   *;
*------------------------------------------------------*;
* INPUT PARAMETERS:                                    *;
*  DATA  : INPUT SAS DATASET                           *;
*  X     : A LIST OF PREDICTORS IN THE NUMERIC FORMAT  *;
*  Y     : A RESPONSE VARIABLE IN THE NUMERIC FORMAT   *;
*  SIGMA : THE SMOOTH PARAMETER FOR GRNN               *;
*  NN_OUT: OUTPUT SAS DATASET CONTAINING THE GRNN      *;
*          SPECIFICATION                               *;
*------------------------------------------------------*;
* AUTHOR:                                              *;
*  WENSUI.LIU@53.COM                                   *;
********************************************************;

data _tmp1;
  set &data (keep = &x &y);
  where &y ~= .;
  array _x_ &x;
  _miss_ = 0;
  do _i_ = 1 to dim(_x_);
    if _x_[_i_] = . then _miss_ = 1; 
  end;
  if _miss_ = 0 then output;
run;

proc summary data = _tmp1;
  output out = _avg_ (drop = _type_ _freq_)
  mean(&x) = ;
run;

proc summary data = _tmp1;
  output out = _std_ (drop = _type_ _freq_)
  std(&x) = ;
run;

proc standard data = _tmp1 mean = 0 std = 1 out = _data_;
  var &x;
run;

data &nn_out (keep = _neuron_ _key_ _value_);
  set _last_ end = eof;
  _neuron_ + 1;
  length _key_ $32;
  array _a_ &y &x;
  do _i_ = 1 to dim(_a_);
    if _i_ = 1 then _key_ = '_Y_';
    else _key_ = upcase(vname(_a_[_i_]));
    _value_ = _a_[_i_];
    output;
  end; 
  if eof then do;
    _neuron_ = 0;
    _key_  = "_SIGMA_";
    _value_  = &sigma;
    output;
    set _avg_;
    array _b_ &x;
    do _i_ = 1 to dim(_b_);
      _neuron_ = -1;
      _key_ = upcase(vname(_b_[_i_]));
      _value_ = _b_[_i_];
      output;
    end;
    set _std_;
    array _c_ &x;
    do _i_ = 1 to dim(_c_);
      _neuron_ = -2;
      _key_ = upcase(vname(_c_[_i_]));
      _value_ = _c_[_i_];
      output;
    end;
  end;
run;

proc datasets library = work;
  delete _: / memtype = data;
run;
quit;

********************************************************;
*              END OF THE MACRO                        *;
********************************************************;
%mend grnn_learn;

%grnn_learn(data = data.boston, x = x1 - x13, y = y, sigma = 0.55, nn_out = data.grnn);

proc print data = data.grnn (obs = 10) noobs;
run;
/* SAS PRINTOUT OF GRNN DATA:
_neuron_    _key_     _value_
    1        _Y_      24.0000
    1        X1       -0.4194
    1        X2        0.2845
    1        X3       -1.2866
    1        X4       -0.2723
    1        X5       -0.1441
    1        X6        0.4133
    1        X7       -0.1199
    1        X8        0.1401
    1        X9       -0.9819
*/

After the training of a GRNN, the macro %grnn_pred() would be used to generate predicted values from a test dataset with all predictors. As shown in the print-out, first 10 predicted values are identical to those generated with R.

libname data '';

%macro grnn_pred(data = , x = , id = NA, nn_in = , out = grnn_pred);
options mprint mlogic nocenter;
********************************************************;
* THIS MACRO IS TO GENERATE PREDICTED VALUES BASED ON  *;
* THE SPECIFICATION OF GRNN CREATED BY THE %GRNN_LEARN *;
* MACRO                                                *;
*------------------------------------------------------*;
* INPUT PARAMETERS:                                    *;
*  DATA : INPUT SAS DATASET                            *;
*  X    : A LIST OF PREDICTORS IN THE NUMERIC FORMAT   *;
*  ID   : AN ID VARIABLE (OPTIONAL)                    *;
*  NN_IN: INPUT SAS DATASET CONTAINING THE GRNN        *;
*         SPECIFICATION GENERATED FROM %GRNN_LEARN     *;
*  OUT  : OUTPUT SAS DATASET WITH GRNN PREDICTIONS     *;
*------------------------------------------------------*;
* AUTHOR:                                              *;
*  WENSUI.LIU@53.COM                                   *;
********************************************************;

data _data_;
  set &data;
  array _x_ &x;
  _miss_ = 0;
  do _i_ = 1 to dim(_x_);
    if _x_[_i_] = . then _miss_ = 1;
  end;
  if _miss_ = 0 then output;
run;

data _data_;
  set _last_ (drop = _miss_);
  %if &id = NA %then %do;
  _id_ + 1;
  %end;
  %else %do;
  _id_ = &id;
  %end;
run;

proc sort data = _last_ sortsize = max nodupkey;
  by _id_;
run;

data _data_ (keep = _id_ _key_ _value_);
  set _last_;
  array _x_ &x;
  length _key_ $32;
  do _i_ = 1 to dim(_x_);
    _key_ = upcase(vname(_x_[_i_]));
    _value_ = _x_[_i_];
    output;
  end;
run;

proc sql noprint;
select _value_ ** 2 into :s2 from &nn_in where _neuron_ = 0;

create table
  _last_ as 
select
  a._id_,
  a._key_,
  (a._value_ - b._value_) / c._value_ as _value_
from
  _last_ as a,
  &nn_in as b,
  &nn_in as c
where
  compress(a._key_, ' ') = compress(b._key_, ' ') and
  compress(a._key_, ' ') = compress(c._key_, ' ') and
  b._neuron_ = -1                                 and
  c._neuron_ = -2;

create table
  _last_ as
select
  a._id_,
  b._neuron_,
  sum((a._value_ - b._value_) ** 2) as d2,
  mean(c._value_)                   as y,
  exp(-(calculated d2) / (2 * &s2)) as exp
from
  _last_  as a,
  &nn_in as b,
  &nn_in as c
where
  compress(a._key_, ' ') = compress(b._key_, ' ') and
  b._neuron_ = c._neuron_                         and
  b._neuron_ > 0                                  and
  c._key_ = '_Y_'
group by
  a._id_, b._neuron_;

create table
  _last_ as
select
  a._id_,
  sum(a.y * a.exp / b.sum_exp) as _pred_
from
  _last_ as a inner join (select _id_, sum(exp) as sum_exp from _last_ group by _id_) as b
on
  a._id_ = b._id_
group by
  a._id_;
quit;

proc sort data = _last_ out = &out sortsize = max;
  by _id_;
run;

********************************************************;
*              END OF THE MACRO                        *;
********************************************************;
%mend grnn_pred;

%grnn_pred(data = data.boston, x = x1 - x13, nn_in = data.grnn);

proc print data = grnn_pred (obs = 10) noobs;
run;
/* SAS PRINTOUT:
_id_     _pred_
  1     24.6156
  2     23.2223
  3     32.2961
  4     32.5770
  5     33.2955
  6     26.7348
  7     21.4602
  8     20.9683
  9     16.5554
 10     20.2525
*/

After the development of these two macros, I also compare predictive performances between GRNN and OLS regression. It turns out that GRNN consistently outperforms OLS regression even with a wide range of SIGMA values. With a reasonable choice of SIGMA value, even a GRNN developed with 10% of the whole Boston Housing dataset is able to generalize well and yield a R^2 > 0.8 based upon the rest 90% data.

Improve The Efficiency in Joining Data with Index

When managing big data with R, many people like to use sqldf() package due to its friendly interface or choose data.table() package for its lightening speed. However, very few would pay special attentions to small details that might significantly boost the efficiency of these packages by adding index to the data.frame or data.table.

In my post on 01/29/2013 (https://statcompute.wordpress.com/2013/01/29/another-benchmark-for-joining-two-data-frames), I’ve shown how to effectively join two data.frames / data.tables. However, the example is not intuitive for people to fully understand the benefit of adding index. In the demonstration below, I will compare 2 scenarios, one with the index and the other without, to show the extra efficiency gained by a simple index.

It is also important to note that creating the index in “ldf” would have the effect of adding the data.frame “ldf” from the R workspace to SQLite database. Therefore, in the 2nd “select…” statement, we need to add “main.” in front of “ldf” in order to use the indexed table “ldf” in SQLite instead of the unindexed table “ldf” in the R environment.

As shown in the benchmark table, simply adding an index can significantly reduce the user time with sqldf package and improves somewhat with data.table package.

libs <- c('sqldf', 'data.table', 'rbenchmark')
lapply(libs, require, character.only = T)

n <- 1000000
set.seed(1)
ldf <- data.frame(id1 = sample(n, n), id2 = sample(n / 1000, n, replace = TRUE), x1 = rnorm(n), x2 = runif(n))
rdf <- data.frame(id1 = sample(n, n), id2 = sample(n / 1000, n, replace = TRUE), y1 = rnorm(n), y2 = runif(n))

benchmark(replications = 5, order = "user.self", 
  noindex.sqldf = (sqldf('select * from ldf as l inner join rdf as r on l.id1 = r.id1 and l.id2 = r.id2')),
  indexed.sqldf = (sqldf(c('create index ldx on ldf(id1, id2)', 
                           'select * from main.ldf as l inner join rdf as r on l.id1 = r.id1 and l.id2 = r.id2')))
)

benchmark(replications = 5, order = "user.self", 
  noindex.table = {
    ldt <- data.table(ldf)
    rdt <- data.table(rdf)
    merge(ldt, rdt, by = c('id1', 'id2'))
  },
  indexed.table = {
    ldt <- data.table(ldf, key = 'id1,id2')
    rdt <- data.table(rdf, key = 'id1,id2')
    merge(ldt, rdt, by = c('id1', 'id2'))
  }
)

SQLDF OUTCOMES

           test replications elapsed relative user.self sys.self user.child
2 indexed.sqldf            5  34.774    1.000    34.511    0.244          0
1 noindex.sqldf            5  61.873    1.779    44.918   16.941          0

DATA.TABLE OUTCOMES

           test replications elapsed relative user.self sys.self user.child
2 indexed.table            5   6.719    1.000     6.609    0.104          0
1 noindex.table            5   6.777    1.009     6.696    0.076          0

R and MongoDB

MongoDB is a document-based noSQL database. Different from the relational database storing data in tables with rigid schemas, MongoDB stores data in documents with dynamic schemas. In the demonstration below, I am going to show how to extract data from a MongoDB with R.

Before starting the R session, we need to install the MongoDB in the local machine and then load the data into the database with the Python code below.

import pandas as pandas
import pymongo as pymongo

df = pandas.read_table('../data/csdata.txt')
lst = [dict([(colname, row[i]) for i, colname in enumerate(df.columns)]) for row in df.values]
for i in range(3):
  print lst[i]

con = pymongo.Connection('localhost', port = 27017)
test = con.db.test
test.drop()
for i in lst:
  test.save(i)

To the best of my knowledge, there are two R packages providing the interface with MongoDB, namely RMongo and rmongodb. While RMongo package is very straight-forward and user-friendly, it did take me a while to figure out how to specify a query with rmongodb package.

RMongo Example

library(RMongo)
mg1 <- mongoDbConnect('db')
print(dbShowCollections(mg1))
query <- dbGetQuery(mg1, 'test', "{'AGE': {'$lt': 10}, 'LIQ': {'$gte': 0.1}, 'IND5A': {'$ne': 1}}")
data1 <- query[c('AGE', 'LIQ', 'IND5A')]
summary(data1)

RMongo Output

Loading required package: rJava
Loading required package: methods
Loading required package: RUnit
[1] "system.indexes" "test"          
      AGE             LIQ             IND5A  
 Min.   :6.000   Min.   :0.1000   Min.   :0  
 1st Qu.:7.000   1st Qu.:0.1831   1st Qu.:0  
 Median :8.000   Median :0.2970   Median :0  
 Mean   :7.963   Mean   :0.3745   Mean   :0  
 3rd Qu.:9.000   3rd Qu.:0.4900   3rd Qu.:0  
 Max.   :9.000   Max.   :1.0000   Max.   :0  

rmongodb Example

library(rmongodb)
mg2 <- mongo.create()
print(mongo.get.databases(mg2))
print(mongo.get.database.collections(mg2, 'db'))
buf <- mongo.bson.buffer.create()
mongo.bson.buffer.start.object(buf, 'AGE')
mongo.bson.buffer.append(buf, '$lt', 10)
mongo.bson.buffer.finish.object(buf)
mongo.bson.buffer.start.object(buf, 'LIQ')
mongo.bson.buffer.append(buf, '$gte', 0.1)
mongo.bson.buffer.finish.object(buf)
mongo.bson.buffer.start.object(buf, 'IND5A')
mongo.bson.buffer.append(buf, '$ne', 1)
mongo.bson.buffer.finish.object(buf)
query <- mongo.bson.from.buffer(buf)
cur <- mongo.find(mg2, 'db.test', query = query)
age <- liq <- ind5a <- NULL
while (mongo.cursor.next(cur)) {
  value <- mongo.cursor.value(cur)
  age   <- rbind(age, mongo.bson.value(value, 'AGE'))
  liq   <- rbind(liq, mongo.bson.value(value, 'LIQ'))
  ind5a <- rbind(ind5a, mongo.bson.value(value, 'IND5A'))
  }
mongo.destroy(mg2)
data2 <- data.frame(AGE = age, LIQ = liq, IND5A = ind5a)
summary(data2)

rmongo Output

rmongodb package (mongo-r-driver) loaded
Use 'help("mongo")' to get started.

[1] "db"
[1] "db.test"
[1] TRUE
[1] TRUE
[1] TRUE
[1] TRUE
[1] TRUE
[1] TRUE
[1] TRUE
[1] TRUE
[1] TRUE
NULL
      AGE             LIQ             IND5A  
 Min.   :6.000   Min.   :0.1000   Min.   :0  
 1st Qu.:7.000   1st Qu.:0.1831   1st Qu.:0  
 Median :8.000   Median :0.2970   Median :0  
 Mean   :7.963   Mean   :0.3745   Mean   :0  
 3rd Qu.:9.000   3rd Qu.:0.4900   3rd Qu.:0  
 Max.   :9.000   Max.   :1.0000   Max.   :0  

Grid Search for Free Parameters with Parallel Computing

In my previous post (https://statcompute.wordpress.com/2013/05/25/test-drive-of-parallel-computing-with-r) on 05/25/2013, I’ve demonstrated the power of parallel computing with various R packages. However, in the real world, it is not straight-forward to utilize these powerful tools in our day-by-day computing tasks without carefully formulate the problem. In the example below, I am going to show how to use the FOREACH package doing a grid search for an optimal set of free parameters in a projection pursuit regression (PPR).

PPR is a powerful non-parametric regression model and is able to approximate any arbitrary function with a sufficiently complex setting, which consists of the smoothness tuning parameter and the number of smoothers used in the case shown below. In practice, a grid search strategy by the cross-sample validation is often employed to identify the optimal combination of these two free parameters. However, the challenge is the high computing cost occurred with the grid search. In the example below, if we’d like to try 6 values of the smoothness parameter and 10 smoothers respectively, then totally 60 combinations need to be tested in PPR. If the serial training with FOR() loops is applied to these 60 PPR, the computing time would be tediously lengthy. In this case, since the training of each 60 PPR is independent of each other, the parallel training with FOREACH() might best suit this scenario.

library(MASS)
data(Boston)
X <- I(as.matrix(Boston[-14]))
st.X <- scale(X)
Y <- I(as.matrix(Boston[14]))
boston <- data.frame(X = st.X, Y)

# DIVIDE THE WHOLE DATA INTO TWO SEPARATE SETS
set.seed(2013)
rows <- sample(1:nrow(boston), nrow(boston) - 200)
set1 <- boston[rows, ]
set2 <- boston[-rows, ]

# LOAD FOREACH PACKAGE
library(doParallel)
registerDoParallel(cores = 8)
library(foreach)

# GRID SEARCH BASED ON THE MINIMUM SSE WITH PARALLEL COMPUTING
cv.sse <- foreach(b = seq(0, 10, 2), .combine = rbind) %dopar% {
             foreach(n = 1:10, .combine = rbind) %dopar% {
               # TRAIN A PROJECTION PURSUIT REGRESSION WITH VARIOUS SETTINGS AND TRAINING DATA
               ppreg <- ppr(Y ~ X, data = set1, nterms = n, sm.method = "supsmu", bass = b)
               # CALCULATE SSE WITH VALIDATION DATA
               test.sse <- sum((set2$Y - predict(ppreg, set2))^2)
               data.frame(bass = b, nterms = n, sse = test.sse)
  }
}
# PRINT OUT THE BEST SETTING BASED ON THE GRID SEARCH
print(best.setting <- cv.sse[cv.sse$sse == min(cv.sse$sse), ]) 

# OUTPUT WITH THE LOWEST SSE BY GRID SEARCH #
#    bass nterms     sse
# 17    2      7 2126.07

# GENERATE A HEAT MAP TO VISUALIZE THE GRID SEARCH OUTCOME 
library(ggplot2)
bass <- factor(cv.sse$bass)
nterms <- factor(cv.sse$nterms)
sse <- factor(floor(cv.sse$sse / 100) * 100)
jpeg('cv.jpeg', width = 800, height = 500, quality = 100)
qplot(x = bass, y = nterms, fill = sse, geom = 'tile')
dev.off()

From the output, it appears that SSE (Sum of Squared Errors) reaches the lowest, e.g. 2,126, with the smoothness parameter equal to 2 and the number of smoothers equal to 7. In addition, a heat map might be also used to visualize outcomes of the grid search in a more intuitive way.

cv