XFrames: Another Convenient Python Interface to Spark

Currently, pyspark might be the most popular python interface to Apache Spark. However, the xframes package (https://github.com/cchayden/xframes) definitely is an alternative worth trying.

As shown in the code snippet below, the XFrame, which is the dataframe object in the xframes package, interacts well with other python data structures and numpy functions. To me, the XFrame is easier to work with than the pyspark.dataframe and has more “authentic” python flavor.

from xframes import XFrame, aggregate

df = XFrame.read_csv("Downloads/nycflights.csv", header = True, nrows = 11)

### SUBSETTING
sel_cols = ["origin", "dest", "distance", "dep_delay", "carrier"]

df2 = df[sel_cols]
# OR:
# df.sql("select " + ", ".join(sel_cols) + " from df")

### FILTERING ###
print df2[(df2["origin"] == 'EWR') & (df2["carrier"] == "UA")]
# OR:
# print df2.filterby("EWR", "origin").filterby("UA", "carrier")

### AGGREGATING ###
from numpy import median

grp1 = df2.groupby("origin", {"dist": aggregate.CONCAT("distance")})

agg1 = XFrame({"origin": grp1["origin"], "med_dist": map(median, grp1["dist"])})
# OR:
# grp1["med_dist"] = grp1.apply(lambda row: median(row["dist"]))
# agg1 = grp1[["origin", "med_dist"]]
# USING SQL:
# df2.sql("select origin, percentile_approx(distance, 0.5) as med_dist from df2 group by origin")

for row in agg1:
  print row
# {'origin': u'LGA', 'med_dist': 747.5}
# {'origin': u'JFK', 'med_dist': 1089.0}
# {'origin': u'EWR', 'med_dist': 1065.0}

agg2 = df2.groupby("origin", {"avg_delay": aggregate.MEAN("dep_delay")})
# USING SQL:
# df2.sql("select origin, mean(dep_delay) as avg_delay from df2 group by origin")

for row in agg2:
  print row
# {'origin': u'LGA', 'avg_delay': -1.75}
# {'origin': u'JFK', 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'avg_delay': -2.3333333333333335}

### JOINING ###
for row in  agg1.join(agg2, on = {"origin": "origin"}, how = "inner"):
    print row
# {'origin': u'LGA', 'med_dist': 747.5, 'avg_delay': -1.75}
# {'origin': u'JFK', 'med_dist': 1089.0, 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'med_dist': 1065.0, 'avg_delay': -2.3333333333333335}
Advertisements

Data Wrangling with Astropy Package

from astropy.io import ascii

from astropy.table import Table, join

from numpy import nanmean, nanmedian, array, sort

tbl1 = ascii.read("Downloads/nycflights.csv", format = "csv")

### SUBSETTING
sel_cols = ["origin", "dest", "distance", "dep_delay", "carrier"]

tbl2 = tbl1[sel_cols][range(10)]

tbl2.info
#   name   dtype
#--------- -----
#   origin  str3
#     dest  str3
# distance int64
#dep_delay int64
#  carrier  str2

### FILTERING ###
tbl2[list(i["carrier"] == "UA" and i["origin"] == 'EWR' for i in tbl2)]
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

tbl2[(tbl2["carrier"] == "UA") & (tbl2["origin"] == "EWR")]
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

### FILTER BY GROUPS ###
vstack(map(lambda x: x[(x["carrier"] == "UA") & (x["origin"] == "EWR")],
           tbl2.group_by(sort(array(range(len(tbl2))) % 2)).groups))
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

### GROUPING ###
grp = tbl2.group_by("origin")

### AGGREGATING ###
agg1 = Table(grp['origin', 'distance'].groups.aggregate(nanmedian), names = ["origin", "med_dist"])
#origin med_dist
#------ --------
#   EWR   1065.0
#   JFK   1089.0
#   LGA    747.5

agg2 = Table(grp['origin', 'dep_delay'].groups.aggregate(nanmean), names = ["origin", "avg_delay"])
#origin      avg_delay
#------ -------------------
#   EWR -2.3333333333333335
#   JFK -0.6666666666666666
#   LGA               -1.75

### JOINING ###
join(agg1, agg2, join_type = "inner", keys = "origin")
#origin med_dist      avg_delay
#------ -------- -------------------
#   EWR   1065.0 -2.3333333333333335
#   JFK   1089.0 -0.6666666666666666
#   LGA    747.5               -1.75

Manipulating Dictionary List with SQLite Back-End


from astropy.io.ascii import read

selected = ["origin", "dep_delay", "distance"]

csv = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), csv)

from dataset import connect

### CREATE IN-MEMORY SQLITE DB ###
db = connect('sqlite:///:memory:', row_type = dict)

tbl = db.create_table("tbl", primary_id = False)

tbl.insert_many(lst)

list(db.query("select * from tbl limit 3"))
# [{u'dep_delay': 2, u'distance': 1400, u'origin': u'EWR'},
#  {u'dep_delay': 4, u'distance': 1416, u'origin': u'LGA'},
#  {u'dep_delay': 2, u'distance': 1089, u'origin': u'JFK'}]

sum1 = db.create_table("sum1", primary_id = False)

from numpy import nanmedian

sum1.insert_many(
  map(lambda x: dict(origin = x,
                     med_dist = nanmedian([i["distance"] for i in
                       db.query("select distance from tbl where origin = :origin", {"origin": x})])),
      [i["origin"] for i in db.query("select distinct origin from tbl")]))

sum2 = db.create_table("sum2", primary_id = False)

sum2.insert_many(list(db.query("select origin, ROUND(AVG(dep_delay), 2) as avg_delay from tbl group by origin")))

list(db.query("select a.*, b.avg_delay from sum1 as a, sum2 as b where a.origin = b.origin"))
#[{u'avg_delay': -2.33, u'med_dist': 1065.0, u'origin': u'EWR'},
# {u'avg_delay': -1.75, u'med_dist': 747.5, u'origin': u'LGA'},
# {u'avg_delay': -0.67, u'med_dist': 1089.0, u'origin': u'JFK'}]

Joining Dictionary Lists by Key

### IMPORT AND PREPARE THE DATA ###
from astropy.io.ascii import read

selected = ["origin", "dep_delay", "distance"]

tbl = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), tbl)

key = set(map(lambda x: x["origin"], lst))

grp = dict(map(lambda x: (x, [i for i in lst if i["origin"] == x]), key))

#   ALTERNATIVELY, USE CLJ.SEQS.GROUP_BY()
# from clj.seqs import group_by
# group_by(lambda x: x["origin"], lst)
#   OR TOOLZ.ITERTOOLZ.GROUPBY()
# from toolz import itertoolz
# itertoolz.groupby("origin", lst)
#   OR ITERTOOLS.GROUPBY()
# key_fn = lambda x: x["origin"]
# dict((k, list(g)) for k, g in groupby(sorted(lst, key = key_fn), key_fn))

### CREATE 2 DICTIONARY LISTS TO BE MERGED ###
from numpy import nanmean, nanmedian

l1 = list({"origin"   : k,
           "med_dist" : nanmedian([v["distance"] for v in grp[k]])
          } for k in grp.keys())
#[{'med_dist': 1089.0, 'origin': 'JFK'},
# {'med_dist': 1065.0, 'origin': 'EWR'},
# {'med_dist': 747.5, 'origin': 'LGA'}]

l2 = list({"origin"   : k,
           "avg_delay": round(nanmean(map(lambda v: v["dep_delay"], grp[k])), 2)
          } for k in grp.keys())
#[{'avg_delay': -0.67, 'origin': 'JFK'},
# {'avg_delay': -2.33, 'origin': 'EWR'},
# {'avg_delay': -1.75, 'origin': 'LGA'}]

### METHOD 1: LIST COMPREHENSION ###
join1 = list(dict(v1, **v2) for v2 in l2 for v1 in l1 if v1["origin"] == v2["origin"])

### METHOD 2: CARTESIAN PRODUCT WITH ITERTOOLS.PRODUCT() ###
from itertools import product

join2 = list(dict(d1, **d2) for d1, d2 in product(l1, l2) if d1["origin"] == d2["origin"])

### METHOD 3: AD-HOC FUNCTION WITH DICT COPY() AND UPDATE() METHODS ###
def join_fn(x, y, key):
  result = list()
  for i, j in group_by(lambda x: x[key], x + y).values():
    xy = i.copy()
    xy.update(j)
    result.append(xy)
  return result

join3 = join_fn(l1, l2, "origin")

### METHOD 4: DEFAULTDICT() ###
from collections import defaultdict

dd = defaultdict(dict)

for x in l1 + l2:
  dd[x["origin"]].update(x)

join4 = dd.values()

### METHOD 5: ORDEREDDICT() ###
from collections import OrderedDict

od = OrderedDict()

for x in l1 + l2:
  if x["origin"] in od:
    od[x["origin"]].update(x)
  else:
    od[x["origin"]] = x

join5 = od.values()

### METHOD 6:  ###
from toolz import itertoolz, dicttoolz

join6 = list(dicttoolz.merge(i, j) for i, j in  itertoolz.join("origin", l1, "origin", l2))

# EXPECTED OUTPUT:
# [{'avg_delay': -0.67, 'med_dist': 1089.0, 'origin': 'JFK'},
#  {'avg_delay': -2.33, 'med_dist': 1065.0, 'origin': 'EWR'},
#  {'avg_delay': -1.75, 'med_dist': 747.5, 'origin': 'LGA'}]

Aggregation of Dictionary List by Key

from astropy.io.ascii import read

from numpy import nanmean, nanmedian, vectorize

selected = ["origin", "dep_delay", "distance"]

tbl = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), tbl)

key = set([x["origin"] for x in lst])

grp = map(lambda x: {x: list(i for i in lst if i["origin"] == x)}, key)

### USING LIST COMPREHENSION ###
list({"origin"    : x.keys()[0],
      "flight_nbr": len(x.values()[0]),
      "med_dist"  : nanmedian([i["distance"] for i in x.values()[0]]),
      "avg_delay" : round(nanmean([i["dep_delay"] for i in x.values()[0]]), 2)
      } for x in grp)

### USING MAP() ALTERNATIVELY ###
map(lambda x: {"origin"    : x.keys()[0],
               "flight_nbr": len(x.values()[0]),
               "med_dist"  : nanmedian([i["distance"] for i in x.values()[0]]),
               "avg_delay" : round(nanmean([i["dep_delay"] for i in x.values()[0]]), 2)}, grp)

### USING VECTORIZE() ###
def agg(x):
  return({"origin"    : x.keys()[0],
          "flight_nbr": len(x.values()[0]),
          "med_dist"  : nanmedian([i["distance"] for i in x.values()[0]]),
          "avg_delay" : round(nanmean([i["dep_delay"] for i in x.values()[0]]), 2)})

list(vectorize(agg)(grp))

# RESULT:
# [{'avg_delay': -0.67, 'flight_nbr': 3, 'med_dist': 1089.0, 'origin': 'JFK'},
#  {'avg_delay': -2.33, 'flight_nbr': 3, 'med_dist': 1065.0, 'origin': 'EWR'},
#  {'avg_delay': -1.75, 'flight_nbr': 4, 'med_dist': 747.5, 'origin': 'LGA'}]

Convert SAS Dataset to Dictionary List

from sas7bdat import SAS7BDAT

with SAS7BDAT("Downloads/accepts.sas7bdat") as f:
  lst = map(lambda x: dict(zip(f.column_names, x)), [i for i in f][1:])

col = ["app_id", "bureau_score", "ltv", "tot_derog", "tot_income", "bad"]

for i in range(5):
  print {k: lst[i].get(k) for k in col}

#{'tot_income': 4800.0, 'ltv': 109.0, 'app_id': 1001.0, 'bureau_score': 747.0, 'bad': 0.0, 'tot_derog': 6.0}
#{'tot_income': 5833.33, 'ltv': 97.0, 'app_id': 1002.0, 'bureau_score': 744.0, 'bad': 0.0, 'tot_derog': 0.0}
#{'tot_income': 2308.33, 'ltv': 105.0, 'app_id': 1003.0, 'bureau_score': 667.0, 'bad': 0.0, 'tot_derog': 0.0}
#{'tot_income': 4083.33, 'ltv': 78.0, 'app_id': 1005.0, 'bureau_score': 648.0, 'bad': 1.0, 'tot_derog': 2.0}
#{'tot_income': 5783.33, 'ltv': 100.0, 'app_id': 1006.0, 'bureau_score': 649.0, 'bad': 0.0, 'tot_derog': 2.0}

Grouping Dictionary List by Key

It is shown in code snippets below how to group a dictionary list based on a specific key.

First of all, let’s import the data from a csv file.

from astropy.io.ascii import read

selected = ["origin", "dest", "distance", "carrier"]

### IMPORT CSV FILE INTO ASTROPY TABLE ###
tbl = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

### CONVERT ASTROPY TABLE TO DICTIONARY LIST ###
lst = map(lambda x: dict(zip(x.colnames, x)), tbl)

### DISPLAY DATA CONTENTS ###
from tabulate import tabulate

print tabulate([lst[i] for i in range(3)], headers = "keys", tablefmt = "fancy_grid")

╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ EWR      │ IAH    │ UA        │       1400 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ ORD    │ UA        │        719 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ FLL    │ B6        │       1065 │
╘══════════╧════════╧═══════════╧════════════╛

In the first approach, only standard Python modules and data structures are used.

### APPROACH 1: HOMEBREW GROUPING ###

from operator import itemgetter

### GET UNIQUE VALUES OF GROUP KEY ###
g_key = set([x["origin"] for x in lst])

### GROUPING LIST BY GROUP KEY ###
g_lst1 = sorted(map(lambda x: (x, [i for i in lst if i["origin"] == x]), g_key), key = itemgetter(0))

for i in g_lst1:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the second approach, we first sort the list by the key and then group the list with the itertools.groupby() function.

### APPROACH 2: ITERTOOLS.GROUPBY  ###

### SORTING DICTIONARY BEFORE GROUPING ###
s_lst = sorted(lst, key = itemgetter('origin'))

### GROUPING DICTIONARY BY "ORIGIN" ###
from itertools import groupby

g_lst2 = [(k, list(g)) for k, g in groupby(s_lst, itemgetter("origin"))]

for i in g_lst2:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the third approach, we use the defaultdict class in the collections module.

### APPROACH 3: DEFAULTDICT ###

from collections import defaultdict

### CREATE KEY-VALUE PAIRS FROM LIST ###
ddata = [(x["origin"], x) for x in lst]

### CREATE DEFAULTDICT ###
ddict = defaultdict(list)

for key, value in ddata:
  ddict[key].append(value)

g_lst3 = sorted(ddict.items(), key = itemgetter(0))

for i in g_lst3:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the fourth approach, we use the ordereddict class also in the collections module.

### APPROACH 4: ORDEREDDICT ###
from collections import OrderedDict

odict = OrderedDict()

for key, value in ddata:
  if key in odict: odict[key].append(value)
  else: odict[key] = [value]

g_lst4 = sorted(odict.items(), key = itemgetter(0))

for i in g_lst4:
  print tabulate(i[1], headers = "keys", tablefmt = "fancy_grid")

In the output below, it is shown that four grouped lists are identical.


g_lst1 == g_lst2 == g_lst3 == g_lst4
# True

╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ EWR      │ IAH    │ UA        │       1400 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ ORD    │ UA        │        719 │
├──────────┼────────┼───────────┼────────────┤
│ EWR      │ FLL    │ B6        │       1065 │
╘══════════╧════════╧═══════════╧════════════╛
╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ JFK      │ MIA    │ AA        │       1089 │
├──────────┼────────┼───────────┼────────────┤
│ JFK      │ BQN    │ B6        │       1576 │
├──────────┼────────┼───────────┼────────────┤
│ JFK      │ MCO    │ B6        │        944 │
╘══════════╧════════╧═══════════╧════════════╛
╒══════════╤════════╤═══════════╤════════════╕
│ origin   │ dest   │ carrier   │   distance │
╞══════════╪════════╪═══════════╪════════════╡
│ LGA      │ IAH    │ UA        │       1416 │
├──────────┼────────┼───────────┼────────────┤
│ LGA      │ ATL    │ DL        │        762 │
├──────────┼────────┼───────────┼────────────┤
│ LGA      │ IAD    │ EV        │        229 │
├──────────┼────────┼───────────┼────────────┤
│ LGA      │ ORD    │ AA        │        733 │
╘══════════╧════════╧═══════════╧════════════╛