XFrames: Another Convenient Python Interface to Spark

Currently, pyspark might be the most popular python interface to Apache Spark. However, the xframes package (https://github.com/cchayden/xframes) definitely is an alternative worth trying.

As shown in the code snippet below, the XFrame, which is the dataframe object in the xframes package, interacts well with other python data structures and numpy functions. To me, the XFrame is easier to work with than the pyspark.dataframe and has more “authentic” python flavor.

from xframes import XFrame, aggregate

df = XFrame.read_csv("Downloads/nycflights.csv", header = True, nrows = 11)

### SUBSETTING
sel_cols = ["origin", "dest", "distance", "dep_delay", "carrier"]

df2 = df[sel_cols]
# OR:
# df.sql("select " + ", ".join(sel_cols) + " from df")

### FILTERING ###
print df2[(df2["origin"] == 'EWR') & (df2["carrier"] == "UA")]
# OR:
# print df2.filterby("EWR", "origin").filterby("UA", "carrier")

### AGGREGATING ###
from numpy import median

grp1 = df2.groupby("origin", {"dist": aggregate.CONCAT("distance")})

agg1 = XFrame({"origin": grp1["origin"], "med_dist": map(median, grp1["dist"])})
# OR:
# grp1["med_dist"] = grp1.apply(lambda row: median(row["dist"]))
# agg1 = grp1[["origin", "med_dist"]]
# USING SQL:
# df2.sql("select origin, percentile_approx(distance, 0.5) as med_dist from df2 group by origin")

for row in agg1:
  print row
# {'origin': u'LGA', 'med_dist': 747.5}
# {'origin': u'JFK', 'med_dist': 1089.0}
# {'origin': u'EWR', 'med_dist': 1065.0}

agg2 = df2.groupby("origin", {"avg_delay": aggregate.MEAN("dep_delay")})
# USING SQL:
# df2.sql("select origin, mean(dep_delay) as avg_delay from df2 group by origin")

for row in agg2:
  print row
# {'origin': u'LGA', 'avg_delay': -1.75}
# {'origin': u'JFK', 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'avg_delay': -2.3333333333333335}

### JOINING ###
for row in  agg1.join(agg2, on = {"origin": "origin"}, how = "inner"):
    print row
# {'origin': u'LGA', 'med_dist': 747.5, 'avg_delay': -1.75}
# {'origin': u'JFK', 'med_dist': 1089.0, 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'med_dist': 1065.0, 'avg_delay': -2.3333333333333335}

Sparkling Water and Moving Data Around

Sparkling Water is an application to integrate H2O with Spark. Below is an example showing how to move the data around among Pandas DataFrame, H2OFrame, and Spark Dataframe.

1. Define Context

In [1]: from pandas import read_csv, DataFrame

In [2]: from pyspark import sql

In [3]: from pysparkling import H2OContext

In [4]: from h2o import import_file, H2OFrame

In [5]: ss = sql.SparkSession.builder.getOrCreate()

In [6]: hc = H2OContext.getOrCreate(ss)

2. Convert Pandas Dataframe to H2OFrame and Spark DataFrame

In [7]: p_df = read_csv("Documents/credit_count.txt")

In [8]: type(p_df)
Out[8]: pandas.core.frame.DataFrame

In [9]: p2s_df = ss.createDataFrame(p_df)

In [10]: type(p2s_df)
Out[10]: pyspark.sql.dataframe.DataFrame

In [11]: p2h_df = H2OFrame(p_df)

In [12]: type(p2h_df)
Out[12]: h2o.frame.H2OFrame

3. Convert Spark Dataframe to H2OFrame and Pandas DataFrame

In [13]: s_df = ss.read.csv("Documents/credit_count.txt", header = True, inferSchema = True)

In [14]: type(s_df)
Out[14]: pyspark.sql.dataframe.DataFrame

In [15]: s2p_df = s_df.toPandas()

In [16]: type(s2p_df)
Out[16]: pandas.core.frame.DataFrame

In [17]: s2h_df = hc.as_h2o_frame(s_df)

In [18]: type(s2h_df)
Out[18]: h2o.frame.H2OFrame

4. Convert H2OFrame to Pandas Dataframe and Spark DataFrame

In [19]: h_df = import_file("Documents/credit_count.txt", header = 1, sep = ",")

In [20]: type(h_df)
Out[20]: h2o.frame.H2OFrame

In [21]: h2p_df = h_df.as_data_frame()

In [22]: type(h2p_df)
Out[22]: pandas.core.frame.DataFrame

In [23]: h2s_df = hc.as_spark_frame(h_df)

In [24]: type(h2s_df)
Out[24]: pyspark.sql.dataframe.DataFrame

Data Aggregation with PySpark

Import CSV File into Spark Dataframe

import pyspark as spark

sc = spark.SQLContext(spark.SparkContext())

sdf1 = sc.read.csv("Documents/nycflights13.csv", header = True, inferSchema = True)

Data Aggregation with Spark Dataframe

import pyspark.sql.functions as fn

sdf1.cache() \
    .filter("month in (1, 3, 5)") \
    .groupby("month") \
    .agg(fn.mean("dep_time").alias("avg_dep"), fn.mean("arr_time").alias("avg_arr")) \
    .show()

+-----+------------------+------------------+
|month|           avg_dep|           avg_arr|
+-----+------------------+------------------+
|    1| 1347.209530642299|1523.1545262203415|
|    3|1359.4997676330747|1509.7429767741473|
|    5|1351.1682074168525|1502.6846604007803|
+-----+------------------+------------------+

Data Aggregation with Spark SQL

sc.registerDataFrameAsTable(sdf1, "tbl1")

sc.sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month").show()

sc.dropTempTable(sc.tableNames()[0])

+-----+------------------+------------------+
|month|           avg_dep|           avg_arr|
+-----+------------------+------------------+
|    1| 1347.209530642299|1523.1545262203415|
|    3|1359.4997676330747|1509.7429767741473|
|    5|1351.1682074168525|1502.6846604007803|
+-----+------------------+------------------+

Kick Off Spark

My first Spark section:


scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sdf = spark.read.option("header", true).csv("Documents/spark/credit_count.txt")
sdf: org.apache.spark.sql.DataFrame = [CARDHLDR: string, DEFAULT: string ... 12 more fields]

scala> sdf.printSchema()
root
 |-- CARDHLDR: string (nullable = true)
 |-- DEFAULT: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- ACADMOS: string (nullable = true)
 |-- ADEPCNT: string (nullable = true)
 |-- MAJORDRG: string (nullable = true)
 |-- MINORDRG: string (nullable = true)
 |-- OWNRENT: string (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- SELFEMPL: string (nullable = true)
 |-- INCPER: string (nullable = true)
 |-- EXP_INC: string (nullable = true)
 |-- SPENDING: string (nullable = true)
 |-- LOGSPEND : string (nullable = true)

scala> sdf.createOrReplaceTempView("tmp1")

scala> spark.sql("select count(*) as obs from tmp1").show()
+-----+
|  obs|
+-----+
|13444|
+-----+

Pyspark section doing the same thing:


In [1]: import pyspark as spark

In [2]: sc = spark.SQLContext(spark.SparkContext())

In [3]: sdf = sc.read.csv("Documents/spark/credit_count.txt", header = True)

In [4]: sdf.printSchema()
root
 |-- CARDHLDR: string (nullable = true)
 |-- DEFAULT: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- ACADMOS: string (nullable = true)
 |-- ADEPCNT: string (nullable = true)
 |-- MAJORDRG: string (nullable = true)
 |-- MINORDRG: string (nullable = true)
 |-- OWNRENT: string (nullable = true)
 |-- INCOME: string (nullable = true)
 |-- SELFEMPL: string (nullable = true)
 |-- INCPER: string (nullable = true)
 |-- EXP_INC: string (nullable = true)
 |-- SPENDING: string (nullable = true)
 |-- LOGSPEND : string (nullable = true)

In [5]: sdf.createOrReplaceTempView("tmp1")

In [6]: sc.sql("select count(*) as obs from tmp1").show()
+-----+
|  obs|
+-----+
|13444|
+-----+