Yet Another Blog in Statistical Computing

I can calculate the motion of heavenly bodies but not the madness of people. -Isaac Newton

Archive for the ‘Big Data’ Category

Mimicking SQLDF with MonetDBLite

Like many useRs, I am also a big fan of the sqldf package developed by Grothendieck, which uses SQL statement for data frame manipulations with SQLite embedded database as the default back-end.

In examples below, I drafted a couple R utility functions with the MonetDBLite back-end by mimicking the sqldf function. There are several interesting observations shown in the benchmark comparison.
– The data import for csv data files is more efficient with MonetDBLite than with the generic read.csv function or read.csv.sql function in the sqldf package.
– The data manipulation for a single data frame, such as selection, aggregation, and subquery, is also significantly faster with MonetDBLite than with the sqldf function.
– However, the sqldf function is extremely efficient in joining 2 data frames, e.g. inner join in the example.


# IMPORT
monet.read.csv <- function(file) {
  monet.con <- DBI::dbConnect(MonetDBLite::MonetDBLite(), ":memory:")
  suppressMessages(MonetDBLite::monetdb.read.csv(monet.con, file, "file", sep = ","))
  result <- DBI::dbReadTable(monet.con, "file")
  DBI::dbDisconnect(monet.con, shutdown = T)
  return(result)  
}

microbenchmark::microbenchmark(monet = {df <- monet.read.csv("Downloads/nycflights.csv")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean   median       uq      max neval
# monet 528.5378 532.5463 539.2877 539.0902 542.4301 559.1191    10

microbenchmark::microbenchmark(read.csv = {df <- read.csv("Downloads/nycflights.csv")}, times = 10)
#Unit: seconds
#     expr      min       lq     mean   median       uq      max neval
# read.csv 2.310238 2.338134 2.360688 2.343313 2.373913 2.444814    10

# SELECTION AND AGGREGATION
monet.sql <- function(df, sql) {
  df_str <- deparse(substitute(df))
  monet.con <- DBI::dbConnect(MonetDBLite::MonetDBLite(), ":memory:")  
  suppressMessages(DBI::dbWriteTable(monet.con, df_str, df, overwrite = T))
  result <- DBI::dbGetQuery(monet.con, sql)
  DBI::dbDisconnect(monet.con, shutdown = T)
  return(result)
}

microbenchmark::microbenchmark(monet = {monet.sql(df, "select * from df sample 3")}, times = 10)
#Unit: milliseconds
#  expr     min      lq     mean   median       uq     max neval
# monet 422.761 429.428 439.0438 438.3503 447.3286 453.104    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select * from df order by RANDOM() limit 3")}, times = 10)
#Unit: milliseconds
#  expr      min      lq     mean   median       uq      max neval
# sqldf 903.9982 908.256 925.4255 920.2692 930.0934 963.6983    10

microbenchmark::microbenchmark(monet = {monet.sql(df, "select origin, median(distance) as med_dist from df group by origin")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean   median       uq      max neval
# monet 450.7862 456.9589 458.6389 458.9634 460.4402 465.2253    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select origin, median(distance) as med_dist from df group by origin")}, times = 10)
#Unit: milliseconds
#  expr      min       lq    mean   median       uq      max neval
# sqldf 833.1494 836.6816 841.952 843.5569 846.8117 851.0771    10

microbenchmark::microbenchmark(monet = {monet.sql(df, "with df1 as (select dest, avg(distance) as dist from df group by dest), df2 as (select dest, count(*) as cnts from df group by dest) select * from df1 inner join df2 on (df1.dest = df2.dest)")}, times = 10)
#Unit: milliseconds
#  expr      min       lq    mean   median       uq     max neval
# monet 426.0248 431.2086 437.634 438.4718 442.8799 451.275    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select * from (select dest, avg(distance) as dist from df group by dest) df1 inner join (select dest, count(*) as cnts from df group by dest) df2 on (df1.dest = df2.dest)")}, times = 10)
#Unit: seconds
#  expr      min       lq     mean   median       uq      max neval
# sqldf 1.013116 1.017248 1.024117 1.021555 1.025668 1.048133    10

# MERGE 
monet.sql2 <- function(df1, df2, sql) {
  df1_str <- deparse(substitute(df1))
  df2_str <- deparse(substitute(df2))
  monet.con <- DBI::dbConnect(MonetDBLite::MonetDBLite(), ":memory:")  
  suppressMessages(DBI::dbWriteTable(monet.con, df1_str, df1, overwrite = T))
  suppressMessages(DBI::dbWriteTable(monet.con, df2_str, df2, overwrite = T))
  result <- DBI::dbGetQuery(monet.con, sql)
  DBI::dbDisconnect(monet.con, shutdown = T)
  return(result)
}

tbl1 <- monet.sql(df, "select dest, avg(distance) as dist from df group by dest")
tbl2 <- monet.sql(df, "select dest, count(*) as cnts from df group by dest")

microbenchmark::microbenchmark(monet = {monet.sql2(tbl1, tbl2, "select * from tbl1 inner join tbl2 on (tbl1.dest = tbl2.dest)")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean  median       uq      max neval
# monet 93.94973 174.2211 170.7771 178.487 182.4724 187.3155    10

microbenchmark::microbenchmark(sqldf = {sqldf::sqldf("select * from tbl1 inner join tbl2 on (tbl1.dest = tbl2.dest)")}, times = 10)
#Unit: milliseconds
#  expr      min       lq     mean median       uq      max neval
# sqldf 19.49334 19.60981 20.29535 20.001 20.93383 21.51837    10

Advertisements

Written by statcompute

May 9, 2018 at 10:32 pm

Posted in Big Data, S+/R, Statistics

Tagged with ,

Read Random Rows from A Huge CSV File

Given R data frames stored in the memory, sometimes it is beneficial to sample and examine the data in a large-size csv file before importing into the data frame. To the best of my knowledge, there is no off-shelf R function performing such data sampling with a relatively low computing cost. Therefore, I drafted two utility functions serving this particular purpose, one with the LaF library and the other with the reticulate library by leveraging the power of Python. While the first function is more efficient and samples 3 records out of 336,776 in about 100 milliseconds, the second one is more for fun and a showcase of the reticulate package.


library(LaF)

sample1 <- function(file, n) {
  lf <- laf_open(detect_dm_csv(file, sep = ",", header = TRUE, factor_fraction = -1))
  return(read_lines(lf, sample(1:nrow(lf), n)))
}

sample1("Downloads/nycflights.csv", 3)
#   year month day dep_time dep_delay arr_time arr_delay carrier tailnum flight
# 1 2013     9  15     1323        -6     1506       -23      MQ  N857MQ   3340
# 2 2013     3  18     1657        -4     2019         9      UA  N35271     80
# 3 2013     6   7     1325        -4     1515       -11      9E  N8477R   3867
#   origin dest air_time distance hour minute
# 1    LGA  DTW       82      502   13     23
# 2    EWR  MIA      157     1085   16     57
# 3    EWR  CVG       91      569   13     25

library(reticulate)

sample2 <- function(file, n) {
  rows <- py_eval(paste("sum(1 for line in open('", file, "'))", sep = '')) - 1
  return(import("pandas")$read_csv(file, skiprows = setdiff(1:rows, sample(1:rows, n))))
}

sample2("Downloads/nycflights.csv", 3)
#   year month day dep_time dep_delay arr_time arr_delay carrier tailnum flight
# 1 2013    10   9      812        12     1010       -16      9E  N902XJ   3507
# 2 2013     4  30     1218       -10     1407       -30      EV  N18557   4091
# 3 2013     8  25     1111        -4     1238       -27      MQ  N721MQ   3281
#   origin dest air_time distance hour minute
# 1    JFK  MSY      156     1182    8     12
# 2    EWR  IND       92      645   12     18
# 3    LGA  CMH       66      479   11     11

Written by statcompute

April 28, 2018 at 9:33 pm

Posted in Big Data, S+/R, Statistics

Tagged with , ,

Updating Column Values in Clojure Map


(require '[huri.core :as h]
         '[clojure.core.matrix.dataset :as d]
         '[incanter.core :as i])

(def ds [{:id 1.0 :name "name1"}
         {:id 2.0 :name "name2"}
         {:id 3.0 :name "name3"}])

;; UPDATE THE :NAME COLUMN IN THE DATASET
;; - IF THE VALUE IS NOT "NAME2", THEN CHANGE TO "NOT 2"
;;
;; EXPECTED OUTPUT:
;; | :id | :name |
;; |-----+-------|
;; | 1.0 | not 2 |
;; | 2.0 | name2 |
;; | 3.0 | not 2 |

;; WITH CLOJURE.CORE/UPDATE
(def d1 (map (fn [x] (update x :name #(if (= "name2" %) % "not 2"))) ds))

;; WITH CLOJURE.CORE/UPDATE-IN
(def d2 (map (fn [x] (update-in x [:name] #(if (= "name2" %) % "not 2"))) ds))

;; WITH HURI/UPDATE-COLS
(def d3 (h/update-cols {:name #(if (= "name2" %) % "not 2")} ds))

;; WITH MATRIX.DATASET/EMAP-COLUMN
(def d4 (-> ds
            (d/dataset)
            (d/emap-column :name #(if (= "name2" %) % "not 2"))
            ((comp #(map into %) d/row-maps))))
   
;; WITH INCANTER/TRANSFORM-COL
(def d5 (-> ds
            (i/to-dataset)
            (i/transform-col :name #(if (= "name2" %) % "not 2"))
            ((comp #(map into %) second vals))))

Written by statcompute

April 22, 2018 at 5:23 pm

Adding New Columns to Clojure Map


(require '[huri.core :as h]
         '[clojure.core.matrix.dataset :as d]
         '[incanter.core :as i])

(def ds [{:id 1.0 :name "name1"}
         {:id 2.0 :name "name2"}
         {:id 3.0 :name "name3"}])

;; ADD 2 COLUMNS TO THE DATASET
;; - ADD 2 TO ID AND NAME ADD2
;; - CHECK NAME = "name2" AND NAME NAME2
;;
;; EXPECTED OUTPUT:
;;| :id | :name | :add2 | :name2 |
;;|-----+-------+-------+--------|
;;| 1.0 | name1 |   3.0 |      N |
;;| 2.0 | name2 |   4.0 |      Y |
;;| 3.0 | name3 |   5.0 |      N |

;; WITH PLAIN CLOJURE
;; #1 - MERGE
(def d1 (map #(merge % {:add2 (+ (:id %) 2) 
                        :name2 (if (= "name2" (:name %)) "Y" "N")}) ds))

;; #2 - MERGE-WITH
(def d2 (map #(merge-with into % {:add2 (+ (:id %) 2)
                                  :name2 (if (= "name2" (:name %)) "Y" "N")}) ds))

;; #3 - ASSOC
(def d3 (map #(assoc % :add2 (+ (:id %) 2) 
                       :name2 (if (= "name2" (:name %)) "Y" "N")) ds))

;; #4 - CONJ
(def d4 (map #(conj % {:add2 (+ (:id %) 2)
                       :name2 (if (= "name2" (:name %)) "Y" "N")}) ds))

;; #5 - CONCAT 
(def d5 (map #(into {} (concat % {:add2 (+ (:id %) 2)
                                  :name2 (if (= "name2" (:name %)) "Y" "N")})) ds))

;; WITH HURI 
(def d6 (h/derive-cols {:name2 [#(if (= "name2" %) "Y" "N") :name] 
                        :add2 [#(+ 2  %) :id]} ds))

;; WITH CORE.MATRIX API
(def d7 (-> ds
            (d/dataset)
            (d/add-column :add2 (map #(+ 2 %) (map :id ds)))
            (d/add-column :name2 (map #(if (= "name2" %) "Y" "N") (map :name ds)))
            (d/row-maps)))

;; WITH INCANTER API
(def d8 (->> ds
             (i/to-dataset)
             (i/add-derived-column :add2 [:id] #(+ 2 %))
             (i/add-derived-column :name2 [:name] #(if (= "name2" %) "Y" "N"))
             ((comp second vals))))

;; CHECK THE DATA EQUALITY
(= d1 d2 d3 d4 d5 d6 d7 d8)
;; true

Written by statcompute

April 21, 2018 at 4:02 pm

Transpose in Clojure


(require '[huri.core :as h]
         '[clojure.core.matrix.dataset :as d]
         '[incanter.core :as i])

;; FROM MAP OF ROWS TO MAP OF COLUMNS

(def byRow [{:x 1 :y "a"}
            {:x 2 :y "b"}
            {:x 3 :y "c"}])

;; APPROACH #1 - PLAIN CLOJURE
(zipmap (keys (first byRow)) (apply map list (map vals byRow)))

; {:x (1 2 3), :y ("a" "b" "c")}

;; APPROACH #2 - HURI LIBRARY
(h/col-oriented byRow)

; {:x (1 2 3), :y ("a" "b" "c")}

;; APPROACH #3 - CORE.MATRIX LIBRARY
(d/to-map (d/dataset (keys (first byRow)) byRow))

; {:x [1 2 3], :y ["a" "b" "c"]}

;; APPROACH #4 - INCANTER LIBRARY
(i/to-map (i/to-dataset byRow))

; {:x (1 2 3), :y ("a" "b" "c")}

;; FROM MAP OF COLUMNS TO MAP OF ROWS

(def byCol {:x '(1 2 3)
            :y '("a" "b" "c")})

;; APPROACH #1 - PLAIN CLOJURE
(map #(zipmap (keys byCol) %) (apply map list (vals byCol)))

; ({:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"})

;; APPROACH #2 - HURI LIBRARY
(h/row-oriented byCol)

; ({:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"})

;; APPROACH #3 - CORE.MATRIX LIBRARY
(d/row-maps (d/dataset (keys byCol) byCol))

; [{:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"}]

;; APPROACH #4 - INCANTER LIBRARY
(second (vals (i/dataset (keys byCol) (apply map list (vals byCol)))))

; ({:x 1, :y "a"} {:x 2, :y "b"} {:x 3, :y "c"})

Written by statcompute

April 13, 2018 at 10:28 pm

Clojure Integration with R


(require '[tnoda.rashinban :as rr]
         '[tnoda.rashinban.core :as rc]
         '[clojure.core.matrix.dataset :as dt]
         '[clojure.core.matrix.impl.dataset :as id])

;; CREATE A TOY DATA
(def ds [{:id 1.0 :name "name1"}
         {:id 2.0 :name "name2"}
         {:id 3.0 :name "name3"}])

;; RUN THE FOLLOWING R CODE IN ADVANCE TO START THE RSERVE SERVER:
;;   R -e 'library(Rserve)' -e 'Rserve(args = "--vanilla")'
;; IF YOU HAVE LITTLER INSTALLED, BELOW ALSO WORKS:
;;   r -e 'library(Rserve); Rserve(args = "--vanilla")'  
(rr/init)

;; PASS THE DATA FROM CLOJURE INTO R
(map (fn [x] (rr/<- (name (key x)) (val x))) 
  (let [ks ((comp keys first) ds)] (zipmap ks (map #(map % ds) ks))))

(rr/<- 'header (map name ((comp keys first) ds)))
         
;; CREATE THE R DATA.FRAME         
(rc/eval "df = data.frame(lapply(header, as.name))")

;; TEST THE R DATA.FRAME
(rc/eval "df$id")
; [1.0 2.0 3.0]

(rc/eval "df$name")
; ["name1" "name2" "name3"]

;; CONVERT THE R DATA.FRAME BACK TO THE CLOJURE MAP
(def mp (into [] (map #(zipmap (map keyword (rr/colnames 'df)) %) 
                   (partition (count (rr/colnames 'df)) (apply interleave (rr/matrix 'df))))))

; [{:id 1.0, :name "name1"} {:id 2.0, :name "name2"} {:id 3.0, :name "name3"}]

;; TEST THE EQUALITY BETWEEN INPUT AND OUTPUT DATA
(= mp ds)
; true

;; ALTERNATIVELY, WE CAN ALSO CONVERT THE R DATA.FRAME TO A CLOJURE DATASET
(def dt (id/dataset-from-columns (map keyword (rr/colnames 'df)) (rr/matrix 'df)))

; #dataset/dataset {:column-names [:id :name], :columns [[1.0 2.0 3.0] ["name1" "name2" "name3"]], :shape [3 2]}

;; NEXT, CONVERT THE DATASET TO THE MAP
(def mp2 (dt/row-maps dt))

; [{:id 1.0, :name "name1"} {:id 2.0, :name "name2"} {:id 3.0, :name "name3"}]

(= ds mp2)
; true

Written by statcompute

April 11, 2018 at 7:47 pm

Posted in Big Data, clojure, S+/R, Statistics

Tagged with ,

Aggregation by Multiple Keys in Clojure


(require '[ultra-csv.core :refer [read-csv]]
         '[criterium.core :refer [quick-bench]]
         '[clojure.set :refer [index]])

(def ds (read-csv "/home/liuwensui/Downloads/nycflights.csv"))

;; FASTEST
(quick-bench
  (map
    (fn [x] {:year (first (key x))
             :month (last (key x))
             :flights (count (val x))})
      (group-by (juxt :year :month) ds)))      

;Evaluation count : 6 in 6 samples of 1 calls.
;             Execution time mean : 712.329182 ms
;    Execution time std-deviation : 3.832950 ms
;   Execution time lower quantile : 709.135737 ms ( 2.5%)
;   Execution time upper quantile : 718.651856 ms (97.5%)
;                   Overhead used : 11.694357 ns

;; WORKS FINE
(quick-bench
  (map
    (fn [x] {:year (:year (key x))
             :month (:month (key x))
             :flights (count (val x))})
      (group-by #(select-keys % [:year :month]) ds)))
      
;Evaluation count : 6 in 6 samples of 1 calls.
;             Execution time mean : 1.485215 sec
;    Execution time std-deviation : 9.832209 ms
;   Execution time lower quantile : 1.476116 sec ( 2.5%)
;   Execution time upper quantile : 1.500560 sec (97.5%)
;                   Overhead used : 11.694357 ns

;; SLOWEST
(quick-bench
  (map
    (fn [x] {:year (:year (key x))
             :month (:month (key x))
             :flights (count (val x))})
      (index ds [:year :month])))
      
;Evaluation count : 6 in 6 samples of 1 calls.
;             Execution time mean : 2.158245 sec
;    Execution time std-deviation : 11.208489 ms
;   Execution time lower quantile : 2.149538 sec ( 2.5%)
;   Execution time upper quantile : 2.175743 sec (97.5%)
;                   Overhead used : 11.694357 ns

Written by statcompute

April 8, 2018 at 4:21 pm

Posted in Big Data, clojure, Statistics

Tagged with