Faster Way to Slice Dataframe by Row

When we’d like to slice a dataframe by row, we can employ the split() function or the iter() function in the iterators package.

By leveraging the power of parallelism, I wrote an utility function slice() to faster slice the dataframe. In the example shown below, the slice() is 3 times more efficient than the split() or the iter() to select 2 records out of 5,960 rows.


df <- read.csv("hmeq.csv")

nrow(df)
# [1] 5960

slice <- function(df) {
  return(parallel::mcMap(function(i) df[i, ], seq(nrow(df)), mc.cores = parallel::detectCores()))
}

Reduce(rbind, Filter(function(x) x$DEROG == 10, slice(df)))
#     BAD  LOAN MORTDUE VALUE  REASON   JOB YOJ DEROG DELINQ     CLAGE NINQ CLNO  DEBTINC
#3094   1 16800   16204 27781 HomeImp Other   1    10      0 190.57710    0    9 27.14689
#3280   1 17500   76100 98500 DebtCon Other   5    10      1  59.83333    5   16       NA

rbenchmark::benchmark(replications = 10, order = "elapsed", relative = "elapsed",
                        columns = c("test", "replications", "elapsed", "relative"),
  "SPLIT" = Reduce(rbind, Filter(Negate(function(x) x$DEROG != 10), split(df, seq(nrow(df))))),
  "ITER " = Reduce(rbind, Filter(Negate(function(x) x$DEROG != 10), as.list(iterators::iter(df, by = "row")))),
  "SLICE" = Reduce(rbind, Filter(Negate(function(x) x$DEROG != 10), slice(df)))
)
#  test replications elapsed relative
# SLICE           10   2.224    1.000
# SPLIT           10   7.185    3.231
# ITER            10   7.375    3.316

Advertisements

Subset Dictionary by Keys

### READ A DATA SAMPLE WITH THREE RECORDS FROM THE CSV FILE
from csv import DictReader

with open("Downloads/nycflights.csv") as f:
  d = DictReader(f)
  l = [next(d) for i in xrange(3)]

### GET A DICT FROM THE LIST
d = l[0]

### SHOW KEYS OF THE DICT
print d.keys()
#['origin', 'dep_time', 'flight', 'hour', 'dep_delay', 'distance', 'dest', 'month', 'air_time', 'carrier', 'year', 'arr_delay', 'tailnum', 'arr_time', 'day', 'minute']

### SELECTED KEYS
selected = ["origin", "dest", "distance", "carrier"]

# EXPECTED OUTPUT 
# {'carrier': 'UA', 'dest': 'IAH', 'distance': '1400', 'origin': 'EWR'}

### METHOD 1: DEFINE BY THE DICT COMPREHENSION

{k: d[k] for k in selected}

{k: d.get(k) for k in selected}

{k: v for k, v in d.items() if k in selected}

### METHOD 2: DEFINE BY THE DICT CONSTRUCTOR

dict((k, d[k]) for k in selected)

dict(map(lambda k: (k, d[k]), selected))

dict(filter(lambda i: i[0] in selected, d.items()))

### METHOD 3: DEFINE WITH THE ZIP() FUNCTION

dict(zip(selected, [d[k] for k in selected]))

# ITEMGETTER() FUNCTION WITH THE UNPACK OPERATOR (*)
from operator import itemgetter
dict(zip(selected, itemgetter(*selected)(d)))

# AT() FUNCTION WITH THE UNPACK OPERATOR (*)
from pydash import at
dict(zip(selected, at(d, *selected)))

### APPLY ABOVE LOGIC TO THE WHOLE LIST OF DICTIONARIES
### WITH THE MAP FUNCTION
map(lambda d: {k: d[k] for k in selected}, l)

### ALTERNATIVELY, WITH THE LIST COMPREHENSION
[(lambda x: {k: x[k] for k in selected})(d) for d in l]

### OR THE PARALLEL POOL.MAP() FUNCTION
# ALWAYS DEFINE THE FUNCTION FIRST
def sel(d):
  return({k: d[k] for k in selected})

# THE MULTIPROCESSING MODULE NEXT
from multiprocessing import Pool, cpu_count
from contextlib import closing

with closing(Pool(processes = cpu_count())) as pool:
  pool.map(sel, l)
  pool.terminate()

# OUTPUT:
# [{'carrier': 'UA', 'dest': 'IAH', 'distance': '1400', 'origin': 'EWR'},
#  {'carrier': 'UA', 'dest': 'IAH', 'distance': '1416', 'origin': 'LGA'},
#  {'carrier': 'AA', 'dest': 'MIA', 'distance': '1089', 'origin': 'JFK'}]

Playing Map() and Reduce() in R – Subsetting

In the previous post (https://statcompute.wordpress.com/2018/09/03/playing-map-and-reduce-in-r-by-group-calculation), I’ve shown how to employ the MapReduce when calculating by-group statistics. Actually, the same Divide-n-Conquer strategy can be applicable to other use cases, one of which is the subsetting operation.

In the example below, let’s still use the same iris data for the demonstration purpose. In R, the most convenient way to perform the subsetting might be the subset() function, which would search for rows meeting the condition described in the “expr” expression below throughout the entire data.frame.

data(iris)
expr = expression(Sepal.Length > 7 & Sepal.Width > 3)
subset(iris, eval(expr))

With the whole data.frame partitioned into multiple pieces, the row searching operation can perfectly fit into the MapReduce paradigm, as described in the logic flow below.
1. First of all, the iris data is divided into chunks with equal number of rows, e.g. two chunks in the example.
2. Next, a Map() function is used to perform the row searching operation within each chunk.
3. Upon the return of rows meeting the criteria from each chunk, a Reduce() function is employed to combine all outcomes together.

n <- 2
lst <- split(iris, sort((1:nrow(iris)) %% n))
Reduce(rbind, Map(function(x) x[with(x, which(eval(expr))), ], lst))

It is noted that the above map operation is still performed sequentially without leveraging the computing power of multiple CPUs. In the CPU usage, we can see that only one CPU is used and the rest are idle.

single_core

Similar to the by-group summary, the by-chunk operation of row searching doesn’t have to be in the sequential order and can be distributed simultaneously across multiple CPUs by using the mcMap() function, as outlined below.
1. Again, it starts with the data partition. However, there are two caveats in the example. Firstly, the data is split based upon the number of CPUs captured by the detectCores() function. Secondly, the partitioned data is NOT stored physically in the memory but reflected logically by a list of future abstractions, e.g. “flst” in the code snippet.
2. In the second step, the mcMap() function is used to evaluate each future abstraction, return the partitioned data, and then perform the row searching within each chunk.
3. At last, the Reduce() function collects and combines all outcomes.

pkgs <- c("parallel", "future")
mapply(function(x) require(x, character.only = T), pkgs)
n <- detectCores()
flst <- Map(function(x) future({x}), split(iris, sort((1:nrow(iris)) %% n)))
Reduce(rbind, mcMap(function(x) value(x)[with(value(x), which(eval(expr))), ], flst, mc.cores = n))

If we take a look at the CPU usage again, it is now shown that all CPUs are utilized.

multicore

Playing Map() and Reduce() in R – By-Group Calculation

Clojure is such an interesting programming language that it can not only enhance our skill set but also change the way how we should write the program. After learning Clojure, I can’t help thinking about how to employ the functional programming and MapReduce paradigm to improve our experience with other programming languages, e.g. R in my case.

When calculating the statistical summary in R, we would go straight to aggregate() or sqldf() function without a second thought. Such by-group calculations seem so simple that we often might not bother to think about the problem itself schematically. Let’s take a look at how to approach this problem in Clojure by using the code below that I copied from https://statcompute.wordpress.com/2018/03/18/do-we-really-need-dataframe-in-clojure.

(def country_sum
  (map (fn [[billingcountry total]]
    {:billiingcountry billingcountry :total (reduce + (map :total total))})
    (group-by :billingcountry inv)))

Although the code looks a little awkward with lots of parenthesis, the idea is very clear and makes sense. We first partition the data into multiple pieces based on groups that we’d like to summarize and then define an anonymous function to sum up the invoice amount, by using a reduce() function, that we used a map() function extracting from the original data, e.g. a list of maps in this case. The whole calculation logic is a loyal reflection of MapReduce.

Now let’s come back to R and think about how to re-frame the solution for the by-group calculation. Using data(iris) as an example, we should first partition the data.frame by “species” with split() so as to convert the data.frame into a list of data.frames by groups. If I apply the class() function to each item in the list “lst1”, we should be able to see three data.frames.

data(iris)
lst1 <- split(iris, iris$Species)
Map(class, lst1)
#$setosa
#[1] "data.frame"
#$versicolor
#[1] "data.frame"
#$virginica
#[1] "data.frame"

After the data partition, we can proceed to calculate the by-group summary with each data.frame in the list. Luckily enough, because the data.frame is generically constructed as a collection of columns instead of rows, we don’t need to use the map operation to extract values from corresponding rows. Instead, we can directly calculate the column summary based on each partitioned data.frame. As shown below, the code is straightforward yet flexible given the use of an anonymous function, which can be customized to accommodate any arbitrary calculation.

Map(function(x) data.frame(grp = unique(x$Species), sl_avg = mean(x$Sepal.Length), sw_avg = mean(x$Sepal.Width)), lst1)
#$setosa
#     grp sl_avg sw_avg
#1 setosa  5.006  3.428
#$versicolor
#         grp sl_avg sw_avg
#1 versicolor  5.936   2.77
#$virginica
#        grp sl_avg sw_avg
#1 virginica  6.588  2.974

Up to now, the problem has been successfully solved. However, if we have a closer look at the solution, it doesn’t take long for us to notice that the calculation in one group is completely orthogonal to the calculation in another group and therefore the by-group calculation doesn’t have to be in a sequential order. In addition, the partitioned data consumes significantly more memory than the original one, which is not an issue for small data sets but could be a potential concern for big data sets. After all, there is no need to have the data always stored in the memory, as long as it is available for us when needed.

To address the first observation, we would bring in the parallel computation by using the parallel::mcMap() function and kicking off multiple CPUs simultaneously. For the second concern, we can introduce the concept of Future, which is the abstraction for a data.frame instead of the data.frame physically stored in the memory. The future, once created with future::future() function, would remain unresolved until we want it to be resolved in the computation by using the future::value() function, at the computing cost for evaluating the future.

With everything put together, below is the final code with the parallel map and the future abstraction.

pkg <- list("parallel", "future")
mapply(function(x) require(x, character.only = T), pkg)
ft <- future({split(iris, iris$Species)})
mcMap(function(i) with(value(ft)[[i]], data.frame(grp = unique(Species), sl_avg = mean(Sepal.Length), sw_avg = mean(Sepal.Width))), 1:length(unique(iris$Species)), mc.cores = detectCores())
#[[1]]
#     grp sl_avg sw_avg
#1 setosa  5.006  3.428
#[[2]]
#         grp sl_avg sw_avg
#1 versicolor  5.936   2.77
#[[3]]
#        grp sl_avg sw_avg
#1 virginica  6.588  2.974

If we would like the output prettier, we could wrap the list into a nice-looking data.frame with a reduce operation by either Reduce(rbind, …) or do.call(rbind, …), where … is the final list from Map() or mcMap() shown above.

Random Search for Optimal Parameters

Practices of manual search, grid search, or the combination of both have been successfully employed in the machine learning to optimize hyper-parameters. However, in the arena of deep learning, both approaches might become impractical. For instance, the computing cost of grid search for hyper-parameters in a multi-layer deep neural network (DNN) could be prohibitively high.

In light of aforementioned hurdles, Bergstra and Bengio proposed a novel idea of random search in the paper http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf. In their study, it was found that random search is more efficient than grid search for the hyper-parameter optimization in terms of computing costs.

In the example below, it is shown that both grid search and random search have reached similar results in the SVM parameter optimization based on cross-validations.

import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV 
from sklearn.svm import SVC as svc 
from sklearn.metrics import make_scorer, roc_auc_score
from scipy import stats

# DATA PREPARATION
df = pd.read_csv("credit_count.txt")
y = df[df.CARDHLDR == 1].DEFAULT.values 
x = preprocessing.scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0) 

# DEFINE MODEL AND PERFORMANCE MEASURE
mdl = svc(probability = True, random_state = 1)
auc = make_scorer(roc_auc_score)

# GRID SEARCH FOR 20 COMBINATIONS OF PARAMETERS
grid_list = {"C": np.arange(2, 10, 2),
             "gamma": np.arange(0.1, 1, 0.2)}

grid_search = GridSearchCV(mdl, param_grid = grid_list, n_jobs = 4, cv = 3, scoring = auc) 
grid_search.fit(x, y) 
grid_search.cv_results_

# RANDOM SEARCH FOR 20 COMBINATIONS OF PARAMETERS
rand_list = {"C": stats.uniform(2, 10),
             "gamma": stats.uniform(0.1, 1)}
             
rand_search = RandomizedSearchCV(mdl, param_distributions = rand_list, n_iter = 20, n_jobs = 4, cv = 3, random_state = 2017, scoring = auc) 
rand_search.fit(x, y) 
rand_search.cv_results_

Dropout Regularization in Deep Neural Networks

The deep neural network (DNN) is a very powerful neural work with multiple hidden layers and is able to capture the highly complex relationship between the response and predictors. However, it is prone to the over-fitting due to a large number of parameters that makes the regularization crucial for DNNs. In the paper (https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf), an interesting regularization approach, e.g. dropout, was proposed with a simple and elegant idea. Basically, it suppresses the complexity of DNNs by randomly dropping units in both input and hidden layers.

Below is an example showing how to tune the hyper-parameter of dropout rates with Keras library in Python. Because of the long computing time required by the dropout, the parallelism is used to speed up the process.

from pandas import read_csv, DataFrame
from numpy.random import seed
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score 
from keras.models import Sequential
from keras.constraints import maxnorm
from keras.optimizers import SGD
from keras.layers import Dense, Dropout
from multiprocessing import Pool, cpu_count
from itertools import product
from parmap import starmap

df = read_csv("credit_count.txt")
Y = df[df.CARDHLDR == 1].DEFAULT
X = df[df.CARDHLDR == 1][['AGE', 'ADEPCNT', 'MAJORDRG', 'MINORDRG', 'INCOME', 'OWNRENT', 'SELFEMPL']]
sX = scale(X)
ncol = sX.shape[1]
x_train, x_test, y_train, y_test = train_test_split(sX, Y, train_size = 0.5, random_state = seed(2017))

def tune_dropout(rate1, rate2):
  net = Sequential()
  ## DROPOUT AT THE INPUT LAYER
  net.add(Dropout(rate1, input_shape = (ncol,)))
  ## DROPOUT AT THE 1ST HIDDEN LAYER
  net.add(Dense(ncol, init = 'normal', activation = 'relu', W_constraint = maxnorm(4)))
  net.add(Dropout(rate2))
  ## DROPOUT AT THE 2ND HIDDER LAYER
  net.add(Dense(ncol, init = 'normal', activation = 'relu', W_constraint = maxnorm(4)))
  net.add(Dropout(rate2))
  net.add(Dense(1, init = 'normal', activation = 'sigmoid'))
  sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False)
  net.compile(loss='binary_crossentropy', optimizer = sgd, metrics = ['accuracy'])
  net.fit(x_train, y_train, batch_size = 200, nb_epoch = 50, verbose = 0)
  print rate1, rate2, "{:6.4f}".format(roc_auc_score(y_test, net.predict(x_test)))

input_dp = [0.1, 0.2, 0.3]
hidden_dp = [0.2, 0.3, 0.4, 0.5]
parms = [i for i in product(input_dp, hidden_dp)]

seed(2017)
starmap(tune_dropout, parms, pool = Pool(processes = cpu_count()))

As shown in the output below, the optimal dropout rate appears to be 0.2 incidentally for both input and hidden layers.

0.1 0.2 0.6354
0.1 0.4 0.6336
0.1 0.3 0.6389
0.1 0.5 0.6378
0.2 0.2 0.6419
0.2 0.4 0.6385
0.2 0.3 0.6366
0.2 0.5 0.6359
0.3 0.4 0.6313
0.3 0.2 0.6350
0.3 0.3 0.6346
0.3 0.5 0.6343

Improve SVM Tuning through Parallelism

As pointed out in the chapter 10 of “The Elements of Statistical Learning”, ANN and SVM (support vector machines) share similar pros and cons, e.g. lack of interpretability and good predictive power. However, in contrast to ANN usually suffering from local minima solutions, SVM is always able to converge globally. In addition, SVM is less prone to over-fitting given a good choice of free parameters, which usually can be identified through cross-validations.

In the R package “e1071”, tune() function can be used to search for SVM parameters but is extremely inefficient due to the sequential instead of parallel executions. In the code snippet below, a parallelism-based algorithm performs the grid search for SVM parameters through the K-fold cross validation.

pkgs <- c('foreach', 'doParallel')
lapply(pkgs, require, character.only = T)
registerDoParallel(cores = 4)
### PREPARE FOR THE DATA ###
df1 <- read.csv("credit_count.txt")
df2 <- df1[df1$CARDHLDR == 1, ]
x <- paste("AGE + ACADMOS + ADEPCNT + MAJORDRG + MINORDRG + OWNRENT + INCOME + SELFEMPL + INCPER + EXP_INC")
fml <- as.formula(paste("as.factor(DEFAULT) ~ ", x))
### SPLIT DATA INTO K FOLDS ###
set.seed(2016)
df2$fold <- caret::createFolds(1:nrow(df2), k = 4, list = FALSE)
### PARAMETER LIST ###
cost <- c(10, 100)
gamma <- c(1, 2)
parms <- expand.grid(cost = cost, gamma = gamma)
### LOOP THROUGH PARAMETER VALUES ###
result <- foreach(i = 1:nrow(parms), .combine = rbind) %do% {
  c <- parms[i, ]$cost
  g <- parms[i, ]$gamma
  ### K-FOLD VALIDATION ###
  out <- foreach(j = 1:max(df2$fold), .combine = rbind, .inorder = FALSE) %dopar% {
    deve <- df2[df2$fold != j, ]
    test <- df2[df2$fold == j, ]
    mdl <- e1071::svm(fml, data = deve, type = "C-classification", kernel = "radial", cost = c, gamma = g, probability = TRUE)
    pred <- predict(mdl, test, decision.values = TRUE, probability = TRUE)
    data.frame(y = test$DEFAULT, prob = attributes(pred)$probabilities[, 2])
  }
  ### CALCULATE SVM PERFORMANCE ###
  roc <- pROC::roc(as.factor(out$y), out$prob) 
  data.frame(parms[i, ], roc = roc$auc[1])
}