## Archive for the ‘**PYTHON**’ Category

## Sparkling Water and Moving Data Around

Sparkling Water is an application to integrate H2O with Spark. Below is an example showing how to move the data around among Pandas DataFrame, H2OFrame, and Spark Dataframe.

**1. Define Context **

In [1]: from pandas import read_csv, DataFrame In [2]: from pyspark import sql In [3]: from pysparkling import H2OContext In [4]: from h2o import import_file, H2OFrame In [5]: ss = sql.SparkSession.builder.getOrCreate() In [6]: hc = H2OContext.getOrCreate(ss)

**2. Convert Pandas Dataframe to H2OFrame and Spark DataFrame**

In [7]: p_df = read_csv("Documents/credit_count.txt") In [8]: type(p_df) Out[8]: pandas.core.frame.DataFrame In [9]: p2s_df = ss.createDataFrame(p_df) In [10]: type(p2s_df) Out[10]: pyspark.sql.dataframe.DataFrame In [11]: p2h_df = H2OFrame(p_df) In [12]: type(p2h_df) Out[12]: h2o.frame.H2OFrame

**3. Convert Spark Dataframe to H2OFrame and Pandas DataFrame**

In [13]: s_df = ss.read.csv("Documents/credit_count.txt", header = True, inferSchema = True) In [14]: type(s_df) Out[14]: pyspark.sql.dataframe.DataFrame In [15]: s2p_df = s_df.toPandas() In [16]: type(s2p_df) Out[16]: pandas.core.frame.DataFrame In [17]: s2h_df = hc.as_h2o_frame(s_df) In [18]: type(s2h_df) Out[18]: h2o.frame.H2OFrame

**4. Convert H2OFrame to Pandas Dataframe and Spark DataFrame**

In [19]: h_df = import_file("Documents/credit_count.txt", header = 1, sep = ",") In [20]: type(h_df) Out[20]: h2o.frame.H2OFrame In [21]: h2p_df = h_df.as_data_frame() In [22]: type(h2p_df) Out[22]: pandas.core.frame.DataFrame In [23]: h2s_df = hc.as_spark_frame(h_df) In [24]: type(h2s_df) Out[24]: pyspark.sql.dataframe.DataFrame

## Data Aggregation with PySpark

**Import CSV File into Spark Dataframe**

import pyspark as spark sc = spark.SQLContext(spark.SparkContext()) sdf1 = sc.read.csv("Documents/nycflights13.csv", header = True, inferSchema = True)

**Data Aggregation with Spark Dataframe**

import pyspark.sql.functions as fn sdf1.cache() \ .filter("month in (1, 3, 5)") \ .groupby("month") \ .agg(fn.mean("dep_time").alias("avg_dep"), fn.mean("arr_time").alias("avg_arr")) \ .show() +-----+------------------+------------------+ |month| avg_dep| avg_arr| +-----+------------------+------------------+ | 1| 1347.209530642299|1523.1545262203415| | 3|1359.4997676330747|1509.7429767741473| | 5|1351.1682074168525|1502.6846604007803| +-----+------------------+------------------+

**Data Aggregation with Spark SQL**

sc.registerDataFrameAsTable(sdf1, "tbl1") sc.sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month").show() sc.dropTempTable(sc.tableNames()[0]) +-----+------------------+------------------+ |month| avg_dep| avg_arr| +-----+------------------+------------------+ | 1| 1347.209530642299|1523.1545262203415| | 3|1359.4997676330747|1509.7429767741473| | 5|1351.1682074168525|1502.6846604007803| +-----+------------------+------------------+

## Kick Off Spark

**My first Spark section:**

scala> import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext scala> val sdf = spark.read.option("header", true).csv("Documents/spark/credit_count.txt") sdf: org.apache.spark.sql.DataFrame = [CARDHLDR: string, DEFAULT: string ... 12 more fields] scala> sdf.printSchema() root |-- CARDHLDR: string (nullable = true) |-- DEFAULT: string (nullable = true) |-- AGE: string (nullable = true) |-- ACADMOS: string (nullable = true) |-- ADEPCNT: string (nullable = true) |-- MAJORDRG: string (nullable = true) |-- MINORDRG: string (nullable = true) |-- OWNRENT: string (nullable = true) |-- INCOME: string (nullable = true) |-- SELFEMPL: string (nullable = true) |-- INCPER: string (nullable = true) |-- EXP_INC: string (nullable = true) |-- SPENDING: string (nullable = true) |-- LOGSPEND : string (nullable = true) scala> sdf.createOrReplaceTempView("tmp1") scala> spark.sql("select count(*) as obs from tmp1").show() +-----+ | obs| +-----+ |13444| +-----+

**Pyspark section doing the same thing:**

In [1]: import pyspark as spark In [2]: sc = spark.SQLContext(spark.SparkContext()) In [3]: sdf = sc.read.csv("Documents/spark/credit_count.txt", header = True) In [4]: sdf.printSchema() root |-- CARDHLDR: string (nullable = true) |-- DEFAULT: string (nullable = true) |-- AGE: string (nullable = true) |-- ACADMOS: string (nullable = true) |-- ADEPCNT: string (nullable = true) |-- MAJORDRG: string (nullable = true) |-- MINORDRG: string (nullable = true) |-- OWNRENT: string (nullable = true) |-- INCOME: string (nullable = true) |-- SELFEMPL: string (nullable = true) |-- INCPER: string (nullable = true) |-- EXP_INC: string (nullable = true) |-- SPENDING: string (nullable = true) |-- LOGSPEND : string (nullable = true) In [5]: sdf.createOrReplaceTempView("tmp1") In [6]: sc.sql("select count(*) as obs from tmp1").show() +-----+ | obs| +-----+ |13444| +-----+

## Random Search for Optimal Parameters

Practices of manual search, grid search, or the combination of both have been successfully employed in the machine learning to optimize hyper-parameters. However, in the arena of deep learning, both approaches might become impractical. For instance, the computing cost of grid search for hyper-parameters in a multi-layer deep neural network (DNN) could be prohibitively high.

In light of aforementioned hurdles, Bergstra and Bengio proposed a novel idea of random search in the paper http://www.jmlr.org/papers/volume13/bergstra12a/bergstra12a.pdf. In their study, it was found that random search is more efficient than grid search for the hyper-parameter optimization in terms of computing costs.

In the example below, it is shown that both grid search and random search have reached similar results in the SVM parameter optimization based on cross-validations.

import pandas as pd import numpy as np from sklearn import preprocessing from sklearn.model_selection import GridSearchCV, RandomizedSearchCV from sklearn.svm import SVC as svc from sklearn.metrics import make_scorer, roc_auc_score from scipy import stats # DATA PREPARATION df = pd.read_csv("credit_count.txt") y = df[df.CARDHLDR == 1].DEFAULT.values x = preprocessing.scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0) # DEFINE MODEL AND PERFORMANCE MEASURE mdl = svc(probability = True, random_state = 1) auc = make_scorer(roc_auc_score) # GRID SEARCH FOR 20 COMBINATIONS OF PARAMETERS grid_list = {"C": np.arange(2, 10, 2), "gamma": np.arange(0.1, 1, 0.2)} grid_search = GridSearchCV(mdl, param_grid = grid_list, n_jobs = 4, cv = 3, scoring = auc) grid_search.fit(x, y) grid_search.cv_results_ # RANDOM SEARCH FOR 20 COMBINATIONS OF PARAMETERS rand_list = {"C": stats.uniform(2, 10), "gamma": stats.uniform(0.1, 1)} rand_search = RandomizedSearchCV(mdl, param_distributions = rand_list, n_iter = 20, n_jobs = 4, cv = 3, random_state = 2017, scoring = auc) rand_search.fit(x, y) rand_search.cv_results_

## A Simple Convolutional Neural Network for The Binary Outcome

Since CNN(Convolutional Neural Networks) have achieved a tremendous success in various challenging applications, e.g. image or digit recognitions, one might wonder how to employ CNNs in classification problems with binary outcomes.

Below is an example showing how to use a simple 1D convolutional neural network to predict credit card defaults.

### LOAD PACKAGES from numpy.random import seed from pandas import read_csv, DataFrame from sklearn.preprocessing import minmax_scale from keras.layers.convolutional import Conv1D, MaxPooling1D from keras.optimizers import SGD from keras.models import Sequential from keras.layers import Dense, Flatten ### PREPARE THE DATA df = read_csv("credit_count.txt") Y = df[df.CARDHLDR == 1].DEFAULT X = minmax_scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0) y_train = Y.values x_train = X.reshape(X.shape[0], X.shape[1], 1) ### FIT A 1D CONVOLUTIONAL NEURAL NETWORK seed(2017) conv = Sequential() conv.add(Conv1D(20, 4, input_shape = x_train.shape[1:3], activation = 'relu')) conv.add(MaxPooling1D(2)) conv.add(Flatten()) conv.add(Dense(1, activation = 'sigmoid')) sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False) conv.compile(loss = 'binary_crossentropy', optimizer = sgd, metrics = ['accuracy']) conv.fit(x_train, y_train, batch_size = 500, epochs = 100, verbose = 0)

Considering that 1D is the special case of 2D, we can also solve the same problem with a 2D convolutional neural network by changing the input shape, as shown below.

from numpy.random import seed from pandas import read_csv, DataFrame from sklearn.preprocessing import minmax_scale from keras_diagram import ascii from keras.layers.convolutional import Conv2D, MaxPooling2D from keras.optimizers import SGD from keras.models import Sequential from keras.layers import Dense, Flatten df = read_csv("credit_count.txt") Y = df[df.CARDHLDR == 1].DEFAULT X = minmax_scale(df[df.CARDHLDR == 1].ix[:, 2:12], axis = 0) y_train = Y.values x_train = X.reshape(X.shape[0], 1, X.shape[1], 1) seed(2017) conv = Sequential() conv.add(Conv2D(20, (1, 4), input_shape = x_train.shape[1:4], activation = 'relu')) conv.add(MaxPooling2D((1, 2))) conv.add(Flatten()) conv.add(Dense(1, activation = 'sigmoid')) sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False) conv.compile(loss = 'binary_crossentropy', optimizer = sgd, metrics = ['accuracy']) conv.fit(x_train, y_train, batch_size = 500, epochs = 100, verbose = 0)

## Autoencoder for Dimensionality Reduction

We often use ICA or PCA to extract features from the high-dimensional data. The autoencoder is another interesting algorithm to achieve the same purpose in the context of Deep Learning.

with the purpose of learning a function to approximate the input data itself such that F(X) = X, an autoencoder consists of two parts, namely encoder and decoder. While the encoder aims to compress the original input data into a low-dimensional representation, the decoder tries to reconstruct the original input data based on the low-dimension representation generated by the encoder. As a result, the autoencoder has been widely used to remove the data noise as well to reduce the data dimension.

First of all, we will show the basic structure of an autoencoder with 1-layer encoder and 1-layer decoder, as below. In the example, we will compress the input data with 10 columns into a compressed on with 3 columns.

from pandas import read_csv, DataFrame from numpy.random import seed from sklearn.preprocessing import minmax_scale from sklearn.model_selection import train_test_split from keras.layers import Input, Dense from keras.models import Model df = read_csv("credit_count.txt") Y = df[df.CARDHLDR == 1].DEFAULTS X = df[df.CARDHLDR == 1].ix[:, 2:12] # SCALE EACH FEATURE INTO [0, 1] RANGE sX = minmax_scale(X, axis = 0) ncol = sX.shape[1] X_train, X_test, Y_train, Y_test = train_test_split(sX, Y, train_size = 0.5, random_state = seed(2017)) ### AN EXAMPLE OF SIMPLE AUTOENCODER ### # InputLayer (None, 10) # Dense (None, 5) # Dense (None, 10) input_dim = Input(shape = (ncol, )) # DEFINE THE DIMENSION OF ENCODER ASSUMED 3 encoding_dim = 3 # DEFINE THE ENCODER LAYER encoded = Dense(encoding_dim, activation = 'relu')(input_dim) # DEFINE THE DECODER LAYER decoded = Dense(ncol, activation = 'sigmoid')(encoded) # COMBINE ENCODER AND DECODER INTO AN AUTOENCODER MODEL autoencoder = Model(input = input_dim, output = decoded) # CONFIGURE AND TRAIN THE AUTOENCODER autoencoder.compile(optimizer = 'adadelta', loss = 'binary_crossentropy') autoencoder.fit(X_train, X_train, nb_epoch = 50, batch_size = 100, shuffle = True, validation_data = (X_test, X_test)) # THE ENCODER TO EXTRACT THE REDUCED DIMENSION FROM THE ABOVE AUTOENCODER encoder = Model(input = input_dim, output = encoded) encoded_input = Input(shape = (encoding_dim, )) encoded_out = encoder.predict(X_test) encoded_out[0:2] #array([[ 0. , 1.26510417, 1.62803197], # [ 2.32508397, 0.99735016, 2.06461048]], dtype=float32)

In the next example, we will relax the constraint of layers and employ a stack of layers to achievement the same purpose as above.

### AN EXAMPLE OF DEEP AUTOENCODER WITH MULTIPLE LAYERS # InputLayer (None, 10) # Dense (None, 20) # Dense (None, 10) # Dense (None, 5) # Dense (None, 3) # Dense (None, 5) # Dense (None, 10) # Dense (None, 20) # Dense (None, 10) input_dim = Input(shape = (ncol, )) # DEFINE THE DIMENSION OF ENCODER ASSUMED 3 encoding_dim = 3 # DEFINE THE ENCODER LAYERS encoded1 = Dense(20, activation = 'relu')(input_dim) encoded2 = Dense(10, activation = 'relu')(encoded1) encoded3 = Dense(5, activation = 'relu')(encoded2) encoded4 = Dense(encoding_dim, activation = 'relu')(encoded3) # DEFINE THE DECODER LAYERS decoded1 = Dense(5, activation = 'relu')(encoded4) decoded2 = Dense(10, activation = 'relu')(decoded1) decoded3 = Dense(20, activation = 'relu')(decoded2) decoded4 = Dense(ncol, activation = 'sigmoid')(decoded3) # COMBINE ENCODER AND DECODER INTO AN AUTOENCODER MODEL autoencoder = Model(input = input_dim, output = decoded4) # CONFIGURE AND TRAIN THE AUTOENCODER autoencoder.compile(optimizer = 'adadelta', loss = 'binary_crossentropy') autoencoder.fit(X_train, X_train, nb_epoch = 100, batch_size = 100, shuffle = True, validation_data = (X_test, X_test)) # THE ENCODER TO EXTRACT THE REDUCED DIMENSION FROM THE ABOVE AUTOENCODER encoder = Model(input = input_dim, output = encoded4) encoded_input = Input(shape = (encoding_dim, )) encoded_out = encoder.predict(X_test) encoded_out[0:2] #array([[ 3.74947715, 0. , 3.22947764], # [ 3.93903661, 0.17448257, 1.86618853]], dtype=float32)

## An Example of Merge Layer in Keras

The power of a DNN does not only come from its depth but also come from its flexibility of accommodating complex network structures. For instance, the DNN shown below consists of two branches, the left with 4 inputs and the right with 6 inputs. In addition, the right branch shows a more complicated structure than the left.

InputLayer (None, 6) Dense (None, 6) BatchNormalization (None, 6) Dense (None, 6) InputLayer (None, 4) BatchNormalization (None, 6) Dense (None, 4) Dense (None, 6) BatchNormalization (None, 4) BatchNormalization (None, 6) \____________________________________/ | Merge (None, 10) Dense (None, 1)

To create a DNN as the above, both left and right branches are defined separately with corresponding inputs and layers. In the line 29, both branches would be combined with a MERGE layer. There are multiple benefits of such merged DNNs. For instance, the DNN has the flexibility to handle various inputs differently. In addition, new features can be added conveniently without messing around with the existing network structure.

from pandas import read_csv, DataFrame from numpy.random import seed from sklearn.preprocessing import scale from keras.models import Sequential from keras.constraints import maxnorm from keras.optimizers import SGD from keras.layers import Dense, Merge from keras.layers.normalization import BatchNormalization from keras_diagram import ascii df = read_csv("credit_count.txt") Y = df[df.CARDHLDR == 1].DEFAULTS X1 = scale(df[df.CARDHLDR == 1][["MAJORDRG", "MINORDRG", "OWNRENT", "SELFEMPL"]]) X2 = scale(df[df.CARDHLDR == 1][["AGE", "ACADMOS", "ADEPCNT", "INCPER", "EXP_INC", "INCOME"]]) branch1 = Sequential() branch1.add(Dense(X1.shape[1], input_shape = (X1.shape[1],), init = 'normal', activation = 'relu')) branch1.add(BatchNormalization()) branch2 = Sequential() branch2.add(Dense(X2.shape[1], input_shape = (X2.shape[1],), init = 'normal', activation = 'relu')) branch2.add(BatchNormalization()) branch2.add(Dense(X2.shape[1], init = 'normal', activation = 'relu', W_constraint = maxnorm(5))) branch2.add(BatchNormalization()) branch2.add(Dense(X2.shape[1], init = 'normal', activation = 'relu', W_constraint = maxnorm(5))) branch2.add(BatchNormalization()) model = Sequential() model.add(Merge([branch1, branch2], mode = 'concat')) model.add(Dense(1, init = 'normal', activation = 'sigmoid')) sgd = SGD(lr = 0.1, momentum = 0.9, decay = 0, nesterov = False) model.compile(loss = 'binary_crossentropy', optimizer = sgd, metrics = ['accuracy']) seed(2017) model.fit([X1, X2], Y.values, batch_size = 2000, nb_epoch = 100, verbose = 1)