Yet Another Blog in Statistical Computing

I can calculate the motion of heavenly bodies but not the madness of people. -Isaac Newton

Archive for the ‘Pyspark’ Category

Sparkling Water and Moving Data Around

Sparkling Water is an application to integrate H2O with Spark. Below is an example showing how to move the data around among Pandas DataFrame, H2OFrame, and Spark Dataframe.

1. Define Context

In [1]: from pandas import read_csv, DataFrame

In [2]: from pyspark import sql

In [3]: from pysparkling import H2OContext

In [4]: from h2o import import_file, H2OFrame

In [5]: ss = sql.SparkSession.builder.getOrCreate()

In [6]: hc = H2OContext.getOrCreate(ss)

2. Convert Pandas Dataframe to H2OFrame and Spark DataFrame

In [7]: p_df = read_csv("Documents/credit_count.txt")

In [8]: type(p_df)
Out[8]: pandas.core.frame.DataFrame

In [9]: p2s_df = ss.createDataFrame(p_df)

In [10]: type(p2s_df)
Out[10]: pyspark.sql.dataframe.DataFrame

In [11]: p2h_df = H2OFrame(p_df)

In [12]: type(p2h_df)
Out[12]: h2o.frame.H2OFrame

3. Convert Spark Dataframe to H2OFrame and Pandas DataFrame

In [13]: s_df = ss.read.csv("Documents/credit_count.txt", header = True, inferSchema = True)

In [14]: type(s_df)
Out[14]: pyspark.sql.dataframe.DataFrame

In [15]: s2p_df = s_df.toPandas()

In [16]: type(s2p_df)
Out[16]: pandas.core.frame.DataFrame

In [17]: s2h_df = hc.as_h2o_frame(s_df)

In [18]: type(s2h_df)
Out[18]: h2o.frame.H2OFrame

4. Convert H2OFrame to Pandas Dataframe and Spark DataFrame

In [19]: h_df = import_file("Documents/credit_count.txt", header = 1, sep = ",")

In [20]: type(h_df)
Out[20]: h2o.frame.H2OFrame

In [21]: h2p_df = h_df.as_data_frame()

In [22]: type(h2p_df)
Out[22]: pandas.core.frame.DataFrame

In [23]: h2s_df = hc.as_spark_frame(h_df)

In [24]: type(h2s_df)
Out[24]: pyspark.sql.dataframe.DataFrame
Advertisements

Written by statcompute

July 3, 2017 at 12:36 am

Data Aggregation with PySpark

Import CSV File into Spark Dataframe

import pyspark as spark

sc = spark.SQLContext(spark.SparkContext())

sdf1 = sc.read.csv("Documents/nycflights13.csv", header = True, inferSchema = True)

Data Aggregation with Spark Dataframe

import pyspark.sql.functions as fn

sdf1.cache() \
    .filter("month in (1, 3, 5)") \
    .groupby("month") \
    .agg(fn.mean("dep_time").alias("avg_dep"), fn.mean("arr_time").alias("avg_arr")) \
    .show()

+-----+------------------+------------------+
|month|           avg_dep|           avg_arr|
+-----+------------------+------------------+
|    1| 1347.209530642299|1523.1545262203415|
|    3|1359.4997676330747|1509.7429767741473|
|    5|1351.1682074168525|1502.6846604007803|
+-----+------------------+------------------+

Data Aggregation with Spark SQL

sc.registerDataFrameAsTable(sdf1, "tbl1")

sc.sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month").show()

sc.dropTempTable(sc.tableNames()[0])

+-----+------------------+------------------+
|month|           avg_dep|           avg_arr|
+-----+------------------+------------------+
|    1| 1347.209530642299|1523.1545262203415|
|    3|1359.4997676330747|1509.7429767741473|
|    5|1351.1682074168525|1502.6846604007803|
+-----+------------------+------------------+

Written by statcompute

June 1, 2017 at 1:02 am

Posted in Pyspark, PYTHON, Spark

Tagged with ,

Kick Off Spark

My first Spark section:


scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sdf = spark.read.option("header", true).csv("Documents/spark/credit_count.txt")
sdf: org.apache.spark.sql.DataFrame = [CARDHLDR: string, DEFAULT: string ... 12 more fields]

scala> sdf.printSchema()
root
 |-- CARDHLDR: string (nullable = true)
 |-- DEFAULT: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- ACADMOS: string (nullable = true)
 |-- ADEPCNT: string (nullable = true)
 |-- MAJORDRG: string (nullable = true)
 |-- MINORDRG: string (nullable = true)
 |-- OWNRENT: string (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- SELFEMPL: string (nullable = true)
 |-- INCPER: string (nullable = true)
 |-- EXP_INC: string (nullable = true)
 |-- SPENDING: string (nullable = true)
 |-- LOGSPEND : string (nullable = true)

scala> sdf.createOrReplaceTempView("tmp1")

scala> spark.sql("select count(*) as obs from tmp1").show()
+-----+
|  obs|
+-----+
|13444|
+-----+

Pyspark section doing the same thing:


In [1]: import pyspark as spark

In [2]: sc = spark.SQLContext(spark.SparkContext())

In [3]: sdf = sc.read.csv("Documents/spark/credit_count.txt", header = True)

In [4]: sdf.printSchema()
root
 |-- CARDHLDR: string (nullable = true)
 |-- DEFAULT: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- ACADMOS: string (nullable = true)
 |-- ADEPCNT: string (nullable = true)
 |-- MAJORDRG: string (nullable = true)
 |-- MINORDRG: string (nullable = true)
 |-- OWNRENT: string (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- SELFEMPL: string (nullable = true)
 |-- INCPER: string (nullable = true)
 |-- EXP_INC: string (nullable = true)
 |-- SPENDING: string (nullable = true)
 |-- LOGSPEND : string (nullable = true)

In [5]: sdf.createOrReplaceTempView("tmp1")

In [6]: sc.sql("select count(*) as obs from tmp1").show()
+-----+
|  obs|
+-----+
|13444|
+-----+

Written by statcompute

May 29, 2017 at 1:34 am

Posted in Pyspark, PYTHON, Spark

Tagged with ,