XFrames: Another Convenient Python Interface to Spark

Currently, pyspark might be the most popular python interface to Apache Spark. However, the xframes package (https://github.com/cchayden/xframes) definitely is an alternative worth trying.

As shown in the code snippet below, the XFrame, which is the dataframe object in the xframes package, interacts well with other python data structures and numpy functions. To me, the XFrame is easier to work with than the pyspark.dataframe and has more “authentic” python flavor.

from xframes import XFrame, aggregate

df = XFrame.read_csv("Downloads/nycflights.csv", header = True, nrows = 11)

### SUBSETTING
sel_cols = ["origin", "dest", "distance", "dep_delay", "carrier"]

df2 = df[sel_cols]
# OR:
# df.sql("select " + ", ".join(sel_cols) + " from df")

### FILTERING ###
print df2[(df2["origin"] == 'EWR') & (df2["carrier"] == "UA")]
# OR:
# print df2.filterby("EWR", "origin").filterby("UA", "carrier")

### AGGREGATING ###
from numpy import median

grp1 = df2.groupby("origin", {"dist": aggregate.CONCAT("distance")})

agg1 = XFrame({"origin": grp1["origin"], "med_dist": map(median, grp1["dist"])})
# OR:
# grp1["med_dist"] = grp1.apply(lambda row: median(row["dist"]))
# agg1 = grp1[["origin", "med_dist"]]
# USING SQL:
# df2.sql("select origin, percentile_approx(distance, 0.5) as med_dist from df2 group by origin")

for row in agg1:
  print row
# {'origin': u'LGA', 'med_dist': 747.5}
# {'origin': u'JFK', 'med_dist': 1089.0}
# {'origin': u'EWR', 'med_dist': 1065.0}

agg2 = df2.groupby("origin", {"avg_delay": aggregate.MEAN("dep_delay")})
# USING SQL:
# df2.sql("select origin, mean(dep_delay) as avg_delay from df2 group by origin")

for row in agg2:
  print row
# {'origin': u'LGA', 'avg_delay': -1.75}
# {'origin': u'JFK', 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'avg_delay': -2.3333333333333335}

### JOINING ###
for row in  agg1.join(agg2, on = {"origin": "origin"}, how = "inner"):
    print row
# {'origin': u'LGA', 'med_dist': 747.5, 'avg_delay': -1.75}
# {'origin': u'JFK', 'med_dist': 1089.0, 'avg_delay': -0.6666666666666666}
# {'origin': u'EWR', 'med_dist': 1065.0, 'avg_delay': -2.3333333333333335}
Advertisements

Growing List vs Growing Queue

### GROWING LIST ###
base_lst1 <- function(df) {
  l <- list()
  for (i in seq(nrow(df))) l[[i]] <- as.list(df[i, ])
  return(l)
}

### PRE-ALLOCATING LIST ###
base_lst2 <- function(df) {
  l <- vector(mode = "list", length = nrow(df))
  for (i in seq(nrow(df))) l[[i]] <- as.list(df[i, ])
  return(l)
}

### DEQUER PACKAGE ###
dequer_queue <- function(df) {
  q <- dequer::queue()
  for (i in seq(nrow(df))) dequer::pushback(q, as.list(df[i, ]))
  return(as.list(q))
}

### LIQUEUER PACKAGE ###
liqueuer_queue <- function(df) {
  q <- liqueueR::Queue$new()
  for (i in seq(nrow(df))) q$push(as.list(df[i, ]))
  return(q$data)
}

### COLLECTIONS PACKAGE ###
collections_queue <- function(df) {
  q <- collections::Queue$new()
  for (i in seq(nrow(df))) q$push(as.list(df[i, ]))
  return(q$as_list())
}

### RSTACKDEQUE PACKAGE ###
rstackdeque_queue <- function(df) {
  q <- rstackdeque::rpqueue()
  for (i in seq(nrow(df))) q <- rstackdeque::insert_back(q, as.list(df[i, ]))
  return(as.list(q))
}

nyc <- read.csv("Downloads/nycflights.csv")

compare <- function(ds) {
  tests <- c("dequer_queue(ds)",
             "base_lst2(ds)",
             "liqueuer_queue(ds)",
             "collections_queue(ds)",
             "rstackdeque_queue(ds)")
  for (t in tests) print(identical(base_lst1(ds), eval(parse(text = t))))
}

compare(nyc[1:10, ])
#[1] TRUE
#[1] TRUE
#[1] TRUE
#[1] TRUE
#[1] TRUE

### BENCHMARKS ###
bm <- function(ds) {
  rbenchmark::benchmark(replications = 5, order = "elapsed", relative = "elapsed",
                        columns = c("test", "replications", "elapsed", "relative"),
  "GROWING LIST"         = base_lst1(ds),
  "PRE-ALLOCATING LIST"  = base_lst2(ds),
  "DEQUER::QUEUE"        = dequer_queue(ds),
  "LIQUEUER::QUEUE"      = liqueuer_queue(ds),
  "COLLECTIONS::QUEUE"   = collections_queue(ds),
  "RSTACKDEQUE::RPQUEUE" = rstackdeque_queue(ds)
  )
}

bm(nyc[1:1000, ])
                  test replications elapsed relative
#1         GROWING LIST            5   0.808    1.000
#2  PRE-ALLOCATING LIST            5   0.839    1.038
#5   COLLECTIONS::QUEUE            5   0.842    1.042
#4      LIQUEUER::QUEUE            5   1.091    1.350
#3        DEQUER::QUEUE            5   1.375    1.702
#6 RSTACKDEQUE::RPQUEUE            5   1.901    2.353

bm(nyc[1:10000, ])
#                  test replications elapsed relative
#5   COLLECTIONS::QUEUE            5   8.175    1.000
#1         GROWING LIST            5   8.505    1.040
#2  PRE-ALLOCATING LIST            5  12.554    1.536
#4      LIQUEUER::QUEUE            5  17.325    2.119
#6 RSTACKDEQUE::RPQUEUE            5  21.785    2.665
#3        DEQUER::QUEUE            5  22.030    2.695

bm(nyc[1:20000, ])
#                  test replications elapsed relative
#5   COLLECTIONS::QUEUE            5  16.730    1.000
#2  PRE-ALLOCATING LIST            5  17.134    1.024
#1         GROWING LIST            5  17.342    1.037
#4      LIQUEUER::QUEUE            5  48.359    2.891
#6 RSTACKDEQUE::RPQUEUE            5  52.420    3.133
#3        DEQUER::QUEUE            5  79.938    4.778

bm(nyc[1:30000, ])
#                  test replications elapsed relative
#2  PRE-ALLOCATING LIST            5  24.600    1.000
#5   COLLECTIONS::QUEUE            5  24.797    1.008
#1         GROWING LIST            5  25.600    1.041
#6 RSTACKDEQUE::RPQUEUE            5  60.908    2.476
#4      LIQUEUER::QUEUE            5 102.482    4.166
#3        DEQUER::QUEUE            5 182.039    7.400

Convert Data Frame to Dictionary List in R

In R, there are a couple ways to convert the column-oriented data frame to a row-oriented dictionary list or alike, e.g. a list of lists.

In the code snippet below, I would show each approach and how to extract keys and values from the dictionary. As shown in the benchmark, it appears that the generic R data structure is still the most efficient.

### LIST() FUNCTION IN BASE PACKAGE ###
x1 <- as.list(iris[1, ])
names(x1)
# [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width"  "Species"
x1[["Sepal.Length"]]
# [1] 5.1

### ENVIRONMENT-BASED SOLUTION ###
envn_dict <- function(x) {
  e <- new.env(hash = TRUE)
  for (name in names(x)) assign(name, x[, name], e)
  return(e)
}

x2 <- envn_dict(iris[1, ])
ls(x2)
# [1] "Petal.Length" "Petal.Width"  "Sepal.Length" "Sepal.Width"  "Species"
x2[["Sepal.Length"]]
# [1] 5.1

### COLLECTIONS PACKAGE ###
coll_dict <-  function(x) {
  d <- collections::Dict$new()
  for (name in names(x)) d$set(name, x[, name])
  return(d)
}

x3 <- coll_dict(iris[1, ])
x3$keys()
# [1] "Petal.Length" "Petal.Width"  "Sepal.Length" "Sepal.Width"  "Species"
x3$get("Sepal.Length")
# [1] 5.1

### HASH PACKAGE ###
hash_dict <- function(x) {
  d <- hash::hash()
  for (name in names(x)) d[[name]] <- x[, name]
  return(d)
}

x4 <- hash_dict(iris[1, ])
hash::keys(x4)
# [1] "Petal.Length" "Petal.Width"  "Sepal.Length" "Sepal.Width"  "Species"
hash::values(x4, "Sepal.Length")
# Sepal.Length
#          5.1

### DATASTRUCTURES PACKAGE ###
data_dict <- function(x) {
  d <- datastructures::hashmap()
  for (name in names(x)) d[name] <- x[, name]
  return(d)
}

x5 <- data_dict(iris[1, ])
datastructures::keys(x5)
# [1] "Species"      "Sepal.Width"  "Petal.Length" "Sepal.Length" "Petal.Width"
datastructures::get(x5, "Sepal.Length")
# [1] 5.1

### FROM PYTHON ###
py2r_dict <- function(x) {
  return(reticulate::py_dict(names(x), x, TRUE))
}

x6 <- py2r_dict(iris[1, ])
x6$keys()
# [1] "Petal.Length" "Sepal.Length" "Petal.Width"  "Sepal.Width"  "Species"
x6["Sepal.Length"]
# [1] 5.1

### CONVERT DATAFRAME TO DICTIONARY LIST ###
to_list <- function(df, fn) {
  l <- list()
  for (i in seq(nrow(df))) l[[i]] <- fn(df[i, ])
  return(l)
}

rbenchmark::benchmark(replications = 100, order = "elapsed", relative = "elapsed",
                      columns = c("test", "replications", "elapsed", "relative", "user.self", "sys.self"),
  "BASE::LIST"              = to_list(iris, as.list),
  "BASE::ENVIRONMENT"       = to_list(iris, envn_dict),
  "COLLECTIONS::DICT"       = to_list(iris, coll_dict),
  "HASH::HASH"              = to_list(iris, hash_dict),
  "DATASTRUCTURES::HASHMAP" = to_list(iris, data_dict),
  "RETICULATE::PY_DICT"     = to_list(iris, py2r_dict)
)
#                     test replications elapsed relative user.self sys.self
#1              BASE::LIST          100   0.857    1.000     0.857    0.000
#2       BASE::ENVIRONMENT          100   1.607    1.875     1.607    0.000
#4              HASH::HASH          100   2.600    3.034     2.600    0.000
#3       COLLECTIONS::DICT          100   2.956    3.449     2.956    0.000
#5 DATASTRUCTURES::HASHMAP          100  16.070   18.751    16.071    0.000
#6     RETICULATE::PY_DICT          100  18.030   21.039    18.023    0.008

Fetching Data From SAS Dataset to Lua Table

data one;
  array c{2} $ _temporary_ ("A", "B");
  do i = 1 to dim(c);
    x = c[i];
    do j = 1 to 2;
      y = round(rannor(1), 0.0001);
      output;
    end;
  end;
run;

proc lua;
submit;
  -- OPEN SAS DATASET FOR READING --
  local dsid = sas.open("work.one", i)

  -- CREATING AN EMPTY LUA TABLE --
  local list = {}

  -- LOOP THROUGH OBSERVATIONS IN SAS DATASET --
  for obs in sas.rows(dsid) do
    local dict = {}

    -- LOOP THROUGH VARIABLES IN EACH OBSERVATION --
    for var in sas.vars(dsid) do
      dict[var.name] = obs[var.name]
    end

    -- INSERT EACH RECORD INTO LUA TABLE --
    table.insert(list, dict)

    -- CLOSE SAS DATASET AFTER THE LAST RECORD --
    if #list == sas.nobs(dsid) then
      sas.close(dsid)
    end
  end

  -- PRINT OUT LUA TABLE --
  for i = 1, #list do
    print(string.rep("*", 5).." RECORD: "..i.." "..string.rep("*", 5))
    for key, value in pairs(list[i]) do
      print(key.." --> "..type(value).." --> "..value)
    end
    print("\n")
  end
  -- WRITE LUA TABLE INTO NEW SAS DATASET --
  new_ds = "work.two"
  sas.write_ds(list, new_ds)

  -- SUBMITTING SAS CODE --
  sas.submit([[proc print data = @ds@ noobs; run]], {ds = new_ds})

endsubmit;
run;

*** OUTPUT SHOWN IN THE LOG ***
***** RECORD: 1 *****
y --> number --> 1.8048
j --> number --> 1
i --> number --> 1
x --> string --> A


***** RECORD: 2 *****
y --> number --> -0.0799
j --> number --> 2
i --> number --> 1
x --> string --> A


***** RECORD: 3 *****
y --> number --> 0.3966
j --> number --> 1
i --> number --> 2
x --> string --> B


***** RECORD: 4 *****
y --> number --> -1.0833
j --> number --> 2
i --> number --> 2
x --> string --> B

Data Wrangling with Astropy Package

from astropy.io import ascii

from astropy.table import Table, join

from numpy import nanmean, nanmedian, array, sort

tbl1 = ascii.read("Downloads/nycflights.csv", format = "csv")

### SUBSETTING
sel_cols = ["origin", "dest", "distance", "dep_delay", "carrier"]

tbl2 = tbl1[sel_cols][range(10)]

tbl2.info
#   name   dtype
#--------- -----
#   origin  str3
#     dest  str3
# distance int64
#dep_delay int64
#  carrier  str2

### FILTERING ###
tbl2[list(i["carrier"] == "UA" and i["origin"] == 'EWR' for i in tbl2)]
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

tbl2[(tbl2["carrier"] == "UA") & (tbl2["origin"] == "EWR")]
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

### FILTER BY GROUPS ###
vstack(map(lambda x: x[(x["carrier"] == "UA") & (x["origin"] == "EWR")],
           tbl2.group_by(sort(array(range(len(tbl2))) % 2)).groups))
#origin dest distance dep_delay carrier
#------ ---- -------- --------- -------
#   EWR  IAH     1400         2      UA
#   EWR  ORD      719        -4      UA

### GROUPING ###
grp = tbl2.group_by("origin")

### AGGREGATING ###
agg1 = Table(grp['origin', 'distance'].groups.aggregate(nanmedian), names = ["origin", "med_dist"])
#origin med_dist
#------ --------
#   EWR   1065.0
#   JFK   1089.0
#   LGA    747.5

agg2 = Table(grp['origin', 'dep_delay'].groups.aggregate(nanmean), names = ["origin", "avg_delay"])
#origin      avg_delay
#------ -------------------
#   EWR -2.3333333333333335
#   JFK -0.6666666666666666
#   LGA               -1.75

### JOINING ###
join(agg1, agg2, join_type = "inner", keys = "origin")
#origin med_dist      avg_delay
#------ -------- -------------------
#   EWR   1065.0 -2.3333333333333335
#   JFK   1089.0 -0.6666666666666666
#   LGA    747.5               -1.75

Manipulating Dictionary List with SQLite Back-End


from astropy.io.ascii import read

selected = ["origin", "dep_delay", "distance"]

csv = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), csv)

from dataset import connect

### CREATE IN-MEMORY SQLITE DB ###
db = connect('sqlite:///:memory:', row_type = dict)

tbl = db.create_table("tbl", primary_id = False)

tbl.insert_many(lst)

list(db.query("select * from tbl limit 3"))
# [{u'dep_delay': 2, u'distance': 1400, u'origin': u'EWR'},
#  {u'dep_delay': 4, u'distance': 1416, u'origin': u'LGA'},
#  {u'dep_delay': 2, u'distance': 1089, u'origin': u'JFK'}]

sum1 = db.create_table("sum1", primary_id = False)

from numpy import nanmedian

sum1.insert_many(
  map(lambda x: dict(origin = x,
                     med_dist = nanmedian([i["distance"] for i in
                       db.query("select distance from tbl where origin = :origin", {"origin": x})])),
      [i["origin"] for i in db.query("select distinct origin from tbl")]))

sum2 = db.create_table("sum2", primary_id = False)

sum2.insert_many(list(db.query("select origin, ROUND(AVG(dep_delay), 2) as avg_delay from tbl group by origin")))

list(db.query("select a.*, b.avg_delay from sum1 as a, sum2 as b where a.origin = b.origin"))
#[{u'avg_delay': -2.33, u'med_dist': 1065.0, u'origin': u'EWR'},
# {u'avg_delay': -1.75, u'med_dist': 747.5, u'origin': u'LGA'},
# {u'avg_delay': -0.67, u'med_dist': 1089.0, u'origin': u'JFK'}]

Joining Dictionary Lists by Key

### IMPORT AND PREPARE THE DATA ###
from astropy.io.ascii import read

selected = ["origin", "dep_delay", "distance"]

tbl = read("Downloads/nycflights.csv", format = 'csv', data_end = 11)[selected]

lst = map(lambda x: dict(zip(x.colnames, x)), tbl)

key = set(map(lambda x: x["origin"], lst))

grp = dict(map(lambda x: (x, [i for i in lst if i["origin"] == x]), key))

#   ALTERNATIVELY, USE CLJ.SEQS.GROUP_BY()
# from clj.seqs import group_by
# group_by(lambda x: x["origin"], lst)
#   OR TOOLZ.ITERTOOLZ.GROUPBY()
# from toolz import itertoolz
# itertoolz.groupby("origin", lst)
#   OR ITERTOOLS.GROUPBY()
# key_fn = lambda x: x["origin"]
# dict((k, list(g)) for k, g in groupby(sorted(lst, key = key_fn), key_fn))

### CREATE 2 DICTIONARY LISTS TO BE MERGED ###
from numpy import nanmean, nanmedian

l1 = list({"origin"   : k,
           "med_dist" : nanmedian([v["distance"] for v in grp[k]])
          } for k in grp.keys())
#[{'med_dist': 1089.0, 'origin': 'JFK'},
# {'med_dist': 1065.0, 'origin': 'EWR'},
# {'med_dist': 747.5, 'origin': 'LGA'}]

l2 = list({"origin"   : k,
           "avg_delay": round(nanmean(map(lambda v: v["dep_delay"], grp[k])), 2)
          } for k in grp.keys())
#[{'avg_delay': -0.67, 'origin': 'JFK'},
# {'avg_delay': -2.33, 'origin': 'EWR'},
# {'avg_delay': -1.75, 'origin': 'LGA'}]

### METHOD 1: LIST COMPREHENSION ###
join1 = list(dict(v1, **v2) for v2 in l2 for v1 in l1 if v1["origin"] == v2["origin"])

### METHOD 2: CARTESIAN PRODUCT WITH ITERTOOLS.PRODUCT() ###
from itertools import product

join2 = list(dict(d1, **d2) for d1, d2 in product(l1, l2) if d1["origin"] == d2["origin"])

### METHOD 3: AD-HOC FUNCTION WITH DICT COPY() AND UPDATE() METHODS ###
def join_fn(x, y, key):
  result = list()
  for i, j in group_by(lambda x: x[key], x + y).values():
    xy = i.copy()
    xy.update(j)
    result.append(xy)
  return result

join3 = join_fn(l1, l2, "origin")

### METHOD 4: DEFAULTDICT() ###
from collections import defaultdict

dd = defaultdict(dict)

for x in l1 + l2:
  dd[x["origin"]].update(x)

join4 = dd.values()

### METHOD 5: ORDEREDDICT() ###
from collections import OrderedDict

od = OrderedDict()

for x in l1 + l2:
  if x["origin"] in od:
    od[x["origin"]].update(x)
  else:
    od[x["origin"]] = x

join5 = od.values()

### METHOD 6:  ###
from toolz import itertoolz, dicttoolz

join6 = list(dicttoolz.merge(i, j) for i, j in  itertoolz.join("origin", l1, "origin", l2))

# EXPECTED OUTPUT:
# [{'avg_delay': -0.67, 'med_dist': 1089.0, 'origin': 'JFK'},
#  {'avg_delay': -2.33, 'med_dist': 1065.0, 'origin': 'EWR'},
#  {'avg_delay': -1.75, 'med_dist': 747.5, 'origin': 'LGA'}]