Implement generic field to columns projection with support for user code.

And use that new facility in the Archive mode of operations, which is a
practical unit of testing and milestone to reach. We have enough code to
actually compile and try running the parser's output.
This commit is contained in:
Dimitri Fontaine 2013-09-25 00:01:46 +02:00
parent b4e530981c
commit 6e3767bb37
5 changed files with 265 additions and 112 deletions

View File

@ -13,7 +13,35 @@
(defun http-fetch-file (url &key (tmpdir *default-tmpdir*)) (defun http-fetch-file (url &key (tmpdir *default-tmpdir*))
"Download a file from URL into TMPDIR." "Download a file from URL into TMPDIR."
(ensure-directories-exist tmpdir) (ensure-directories-exist tmpdir)
(drakma:http-request url :force-binary t)) (let ((archive-filename (make-pathname :directory (namestring tmpdir)
:name (pathname-name url)
:type (pathname-type url))))
(multiple-value-bind (http-stream
status-code
headers
uri
stream
should-close
status)
(drakma:http-request url :force-binary t :want-stream t)
;; TODO: check the status-code
(declare (ignore status-code uri stream status))
(let* ((source-stream (flexi-streams:flexi-stream-stream http-stream))
(content-length
(parse-integer (cdr (assoc :content-length headers)))))
(with-open-file (archive-stream archive-filename
:direction :output
:element-type '(unsigned-byte 8)
:if-exists :supersede
:if-does-not-exist :create)
(let ((seq (make-array content-length
:element-type '(unsigned-byte 8)
:fill-pointer t)))
(setf (fill-pointer seq) (read-sequence seq source-stream))
(write-sequence seq archive-stream)))
(when should-close (close source-stream))))
;; return the pathname where we just downloaded the file
archive-filename))
(defun expand-archive (archive-file &key (tmpdir *default-tmpdir*)) (defun expand-archive (archive-file &key (tmpdir *default-tmpdir*))
"Expand given ARCHIVE-FILE in TMPDIR/(pathname-name ARCHIVE-FILE). Return "Expand given ARCHIVE-FILE in TMPDIR/(pathname-name ARCHIVE-FILE). Return
@ -25,8 +53,20 @@
(fad:pathname-as-directory (merge-pathnames archive-name tmpdir)))) (fad:pathname-as-directory (merge-pathnames archive-name tmpdir))))
(ensure-directories-exist expand-directory) (ensure-directories-exist expand-directory)
(ecase archive-type (ecase archive-type
(:zip (zip:unzip archive-file expand-directory))))) (:zip (zip:unzip archive-file expand-directory)))
;; return the pathname where we did expand the archive
expand-directory))
(defun get-matching-filenames (directory regex)
"Apply given REGEXP to the DIRECTORY contents and return the list of
matching files."
(let ((matches nil)
(start (length (namestring directory))))
(flet ((push-matches (pathname)
(when (cl-ppcre:scan regex (namestring pathname) :start start)
(push pathname matches))))
(fad:walk-directory directory #'push-matches))
matches))
;;; ;;;

119
csv.lisp
View File

@ -12,13 +12,67 @@
:name table-name :name table-name
:type "csv")) :type "csv"))
(defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*))
"PATHNAME-OR-REGEX is expected to be either (:regexp expression)
or (:filename pathname). In the first case, this fonction check if the
pathname is absolute or relative and returns an absolute pathname given
current working directory of ROOT.
In the second case, walk the ROOT directory and return the first pathname
that matches the regex. TODO: consider signaling a condition when we have
more than one match."
(destructuring-bind (type part) pathname-or-regex
(ecase type
(:regex (first (pgloader.archive:get-matching-filenames root part)))
(:filename (if (fad:pathname-absolute-p part) part
(merge-pathnames part root))))))
;;;
;;; Project fields into columns
;;;
(defun project-fields (&key fields columns null-as)
"The simplest projection happens when both FIELDS and COLS are nil: in
this case the projection is an identity, we simply return what we got --
still with some basic processing.
Other forms of projections consist of forming columns with the result of
applying a transformation function. In that case a cols entry is a list
of '(colname type expression), the expression being the (already
compiled) function to use here."
(let ((null-as (if (eq null-as :blanks)
(lambda (col)
(every (lambda (char) (char= char #\Space)) col))
`(lambda (col)
(if (string= ,null-as col) nil col)))))
(if (or (null fields) (null columns))
`(lambda (row)
(mapcar ,null-as row))
(let* ((args
(mapcar
(lambda (field)
(pgloader.transforms:intern-symbol (car field))) fields))
(newrow
(loop for (name type fn) in columns
collect (or fn
(let ((binding
(pgloader.transforms:intern-symbol name)))
`(funcall ,null-as ,binding))))))
`(lambda (row)
(destructuring-bind (,@args) row
(list ,@newrow)))))))
;;; ;;;
;;; Read a file format in CSV format, and call given function on each line. ;;; Read a file format in CSV format, and call given function on each line.
;;; ;;;
(defun map-rows (table-name filename (defun map-rows (table-name filename
&key &key
process-row-fn process-row-fn
(skip-first-p nil) fields
columns
(skip-lines nil)
(separator #\Tab) (separator #\Tab)
(quote cl-csv:*quote*) (quote cl-csv:*quote*)
(escape cl-csv:*quote-escape*) (escape cl-csv:*quote-escape*)
@ -41,16 +95,23 @@ Finally returns how many rows where read and processed."
(let* ((read 0) (let* ((read 0)
(reformat-then-process (reformat-then-process
(compile nil
(lambda (row) (lambda (row)
(incf read) (incf read)
(let ((row-with-nils (let* ((processing (project-fields :fields fields
(mapcar (lambda (x) (if (string= null-as x) nil x)) row))) :columns columns
(funcall process-row-fn row-with-nils))))) :null-as null-as))
(processed-row (funcall processing row)))
(funcall process-row-fn processed-row))))))
;; we handle skipping more than one line here, as cl-csv only knows
;; about skipping the first line
(when (and skip-lines (< 0 skip-lines))
(loop repeat skip-lines do (read-line input nil nil)))
(handler-case (handler-case
(cl-csv:read-csv input (cl-csv:read-csv input
:row-fn reformat-then-process :row-fn reformat-then-process
:skip-first-p skip-first-p
:separator separator :separator separator
:quote quote :quote quote
:escape escape) :escape escape)
@ -63,7 +124,9 @@ Finally returns how many rows where read and processed."
(defun copy-to-queue (table-name filename dataq (defun copy-to-queue (table-name filename dataq
&key &key
(skip-first-p nil) fields
columns
skip-lines
(separator #\Tab) (separator #\Tab)
(quote cl-csv:*quote*) (quote cl-csv:*quote*)
(escape cl-csv:*quote-escape*) (escape cl-csv:*quote-escape*)
@ -71,18 +134,22 @@ Finally returns how many rows where read and processed."
"Copy data from CSV FILENAME into lprallel.queue DATAQ" "Copy data from CSV FILENAME into lprallel.queue DATAQ"
(let ((read (let ((read
(pgloader.queue:map-push-queue dataq #'map-rows table-name filename (pgloader.queue:map-push-queue dataq #'map-rows table-name filename
:skip-first-p skip-first-p :fields fields
:columns columns
:skip-lines skip-lines
:separator separator :separator separator
:quote quote :quote quote
:escape escape :escape escape
:null-as null-as))) :null-as null-as)))
(pgstate-incf *state* table-name :read read))) (pgstate-incf *state* table-name :read read)))
(defun copy-from-file (dbname table-name filename (defun copy-from-file (dbname table-name filename-or-regex
&key &key
fields
columns
transforms transforms
(truncate t) (truncate t)
(skip-first-p nil) skip-lines
(separator #\Tab) (separator #\Tab)
(quote cl-csv:*quote*) (quote cl-csv:*quote*)
(escape cl-csv:*quote-escape*) (escape cl-csv:*quote-escape*)
@ -90,20 +157,10 @@ Finally returns how many rows where read and processed."
"Copy data from CSV file FILENAME into PostgreSQL DBNAME.TABLE-NAME" "Copy data from CSV file FILENAME into PostgreSQL DBNAME.TABLE-NAME"
(let* ((report-header (null *state*)) (let* ((report-header (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate))) (*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (lp:*kernel* (make-kernel 2))
(lp:make-kernel 2 :bindings
`((*pgconn-host* . ,*pgconn-host*)
(*pgconn-port* . ,*pgconn-port*)
(*pgconn-user* . ,*pgconn-user*)
(*pgconn-pass* . ,*pgconn-pass*)
(*pg-settings* . ',*pg-settings*)
(*myconn-host* . ,*myconn-host*)
(*myconn-port* . ,*myconn-port*)
(*myconn-user* . ,*myconn-user*)
(*myconn-pass* . ,*myconn-pass*)
(*state* . ,*state*))))
(channel (lp:make-channel)) (channel (lp:make-channel))
(dataq (lq:make-queue :fixed-capacity 4096))) (dataq (lq:make-queue :fixed-capacity 4096))
(filename (get-absolute-pathname filename-or-regex)))
;; statistics ;; statistics
(when report-header (report-header)) (when report-header (report-header))
@ -112,23 +169,23 @@ Finally returns how many rows where read and processed."
(multiple-value-bind (res secs) (multiple-value-bind (res secs)
(timing (timing
(lp:submit-task channel (lambda () (lp:submit-task channel
;; this function update :read stats ;; this function update :read stats
(copy-to-queue table-name filename dataq #'copy-to-queue table-name filename dataq
:skip-first-p skip-first-p :fields fields
:columns columns
:skip-lines skip-lines
:separator separator :separator separator
:quote quote :quote quote
:escape escape :escape escape
:null-as null-as))) :null-as null-as)
;; and start another task to push that data from the queue to PostgreSQL ;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task (lp:submit-task channel
channel
(lambda ()
;; this function update :rows stats ;; this function update :rows stats
(pgloader.pgsql:copy-from-queue dbname table-name dataq #'pgloader.pgsql:copy-from-queue dbname table-name dataq
:truncate truncate :truncate truncate
:transforms transforms))) :transforms transforms)
;; now wait until both the tasks are over ;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)) (loop for tasks below 2 do (lp:receive-result channel))

View File

@ -37,14 +37,6 @@
#:camelCase-to-colname #:camelCase-to-colname
#:make-kernel)) #:make-kernel))
(defpackage #:pgloader.parser
(:use #:cl #:esrap #:pgloader.params)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute)
(:export #:parse-command
#:run-command))
(defpackage #:pgloader.queue (defpackage #:pgloader.queue
(:use #:cl) (:use #:cl)
(:export #:map-pop-queue (:export #:map-pop-queue
@ -65,6 +57,14 @@
#:get-date-columns #:get-date-columns
#:format-row)) #:format-row))
(defpackage #:pgloader.parser
(:use #:cl #:esrap #:pgloader.params)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute)
(:export #:parse-command
#:run-command))
;; ;;
;; Specific source handling ;; Specific source handling
@ -97,6 +97,7 @@
(:export #:*default-tmpdir* (:export #:*default-tmpdir*
#:http-fetch-file #:http-fetch-file
#:expand-archive #:expand-archive
#:get-matching-filenames
#:import-csv-from-zip)) #:import-csv-from-zip))
(defpackage #:pgloader.syslog (defpackage #:pgloader.syslog

View File

@ -172,11 +172,63 @@ Here's a quick description of the format we're parsing here:
(def-keyword-rule "before") (def-keyword-rule "before")
(def-keyword-rule "finally") (def-keyword-rule "finally")
(def-keyword-rule "and") (def-keyword-rule "and")
(def-keyword-rule "do")) (def-keyword-rule "do")
(def-keyword-rule "filename")
(def-keyword-rule "matching"))
(defrule kw-auto-increment (and "auto_increment" (* (or #\Tab #\Space))) (defrule kw-auto-increment (and "auto_increment" (* (or #\Tab #\Space)))
(:constant :auto-increment)) (:constant :auto-increment))
;;;
;;; Regular Expression support, quoted as-you-like
;;;
(defun process-quoted-regex (pr)
"Helper function to process different kinds of quotes for regexps"
(destructuring-bind (open regex close) pr
(declare (ignore open close))
`(:regex ,(text regex))))
(defrule single-quoted-regex (and #\' (+ (not #\')) #\')
(:function process-quoted-regex))
(defrule double-quoted-regex (and #\" (+ (not #\")) #\")
(:function process-quoted-regex))
(defrule parens-quoted-regex (and #\( (+ (not #\))) #\))
(:function process-quoted-regex))
(defrule braces-quoted-regex (and #\{ (+ (not #\})) #\})
(:function process-quoted-regex))
(defrule chevron-quoted-regex (and #\< (+ (not #\>)) #\>)
(:function process-quoted-regex))
(defrule brackets-quoted-regex (and #\[ (+ (not #\])) #\])
(:function process-quoted-regex))
(defrule slash-quoted-regex (and #\/ (+ (not #\/)) #\/)
(:function process-quoted-regex))
(defrule pipe-quoted-regex (and #\| (+ (not #\|)) #\|)
(:function process-quoted-regex))
(defrule sharp-quoted-regex (and #\# (+ (not #\#)) #\#)
(:function process-quoted-regex))
(defrule quoted-regex (and "~" (or single-quoted-regex
double-quoted-regex
parens-quoted-regex
braces-quoted-regex
chevron-quoted-regex
brackets-quoted-regex
slash-quoted-regex
pipe-quoted-regex
sharp-quoted-regex))
(:lambda (qr)
(destructuring-bind (tilde regex) qr
(declare (ignore tilde))
regex)))
;;; ;;;
@ -860,11 +912,11 @@ Here's a quick description of the format we're parsing here:
(declare (ignore fields )) (declare (ignore fields ))
sep))) sep)))
(defrule csv-option (and (or option-truncate (defrule csv-option (or option-truncate
option-skip-header option-skip-header
option-fields-enclosed-by option-fields-enclosed-by
option-fields-escaped-by option-fields-escaped-by
option-fields-terminated-by))) option-fields-terminated-by))
(defrule another-csv-option (and #\, ignore-whitespace csv-option) (defrule another-csv-option (and #\, ignore-whitespace csv-option)
(:lambda (source) (:lambda (source)
@ -893,7 +945,7 @@ Here's a quick description of the format we're parsing here:
(cons :date-format date-format)))) (cons :date-format date-format))))
(defrule option-null-if-blanks (and kw-null kw-if kw-blanks) (defrule option-null-if-blanks (and kw-null kw-if kw-blanks)
(:constant (cons :null-if-blanks t))) (:constant (cons :null-as :blanks)))
(defrule csv-field-option (or option-terminated-by (defrule csv-field-option (or option-terminated-by
option-date-format option-date-format
@ -950,13 +1002,9 @@ Here's a quick description of the format we're parsing here:
(or (alphanumericp character) (or (alphanumericp character)
(member character '(#\_ #\-)))) (member character '(#\_ #\-))))
(defun intern-transforms-symbol (symbol-name)
(intern (string-upcase symbol-name)
(find-package "PGLOADER.TRANSFORMS")))
(defrule sexp-symbol (+ (symbol-character-p character)) (defrule sexp-symbol (+ (symbol-character-p character))
(:lambda (schars) (:lambda (schars)
(intern-transforms-symbol (text schars)))) (pgloader.transforms:intern-symbol (text schars))))
(defrule sexp-string-char (or (not-doublequote character) (and #\\ #\"))) (defrule sexp-string-char (or (not-doublequote character) (and #\\ #\")))
@ -1020,14 +1068,23 @@ Here's a quick description of the format we're parsing here:
;; ;;
;; The main command parsing ;; The main command parsing
;; ;;
(defrule csv-source (and kw-load kw-csv kw-from maybe-quoted-filename) (defrule filename-matching (and kw-filename kw-matching quoted-regex)
(:lambda (fm)
(destructuring-bind (filename matching regex) fm
(declare (ignore filename matching))
regex)))
(defrule filename-or-regex (or filename-matching maybe-quoted-filename))
(defrule csv-source (and kw-load kw-csv kw-from filename-or-regex)
(:lambda (src) (:lambda (src)
(destructuring-bind (load csv from source) src (destructuring-bind (load csv from source) src
(declare (ignore load csv from)) (declare (ignore load csv from))
;; source is (:filename #P"pathname/here") ;; source is (:filename #P"pathname/here")
(destructuring-bind (type uri) source (destructuring-bind (type uri) source
(ecase type (ecase type
(:filename uri)))))) (:filename source)
(:regex source))))))
(defun list-symbols (expression &optional s) (defun list-symbols (expression &optional s)
"Return a list of the symbols used in EXPRESSION." "Return a list of the symbols used in EXPRESSION."
@ -1037,22 +1094,6 @@ Here's a quick description of the format we're parsing here:
finally (return (reverse s)))) finally (return (reverse s))))
(t s))) (t s)))
(defun col-expr-to-lambda (fields cols)
"Transform expression into proper lambda definitions."
(loop
with args = (mapcar
(lambda (field) (intern-transforms-symbol (car field)))
fields)
for (name type expression) in cols
for fun = (when expression
(let ((args-not-used
(set-difference args (list-symbols expression))))
(compile nil `(lambda (,@args)
;; avoid warnings when compiling the command
(declare (ignore ,@args-not-used))
,expression))))
collect (list name type fun)))
(defrule load-csv-file (and csv-source (? csv-source-field-list) (defrule load-csv-file (and csv-source (? csv-source-field-list)
target (? csv-target-column-list) target (? csv-target-column-list)
csv-options) csv-options)
@ -1068,10 +1109,9 @@ Here's a quick description of the format we're parsing here:
(*pgconn-pass* ,password)) (*pgconn-pass* ,password))
(pgloader.csv:copy-from-file ,dbname (pgloader.csv:copy-from-file ,dbname
,table-name ,table-name
,source ',source
:fields ',fields :fields ',fields
:cols ',(funcall #'col-expr-to-lambda :columns ',columns
fields columns)
,@options))))))) ,@options)))))))
@ -1106,14 +1146,6 @@ Here's a quick description of the format we're parsing here:
(declare (ignore finally do)) (declare (ignore finally do))
quoted))) quoted)))
(defrule archive-source (and kw-in kw-archive http-uri)
(:lambda (src)
(destructuring-bind (in archive source) src
(declare (ignore in archive))
(destructuring-bind (type url) source
(ecase type
(:http url))))))
(defrule archive-command (or load-csv-file (defrule archive-command (or load-csv-file
load-dbf-file)) load-dbf-file))
@ -1128,20 +1160,37 @@ Here's a quick description of the format we're parsing here:
(destructuring-bind (col1 cols) source (destructuring-bind (col1 cols) source
(list* col1 cols)))) (list* col1 cols))))
(defrule in-archive (and archive-source (defrule archive-source (and kw-load kw-from kw-archive http-uri)
(:lambda (src)
(destructuring-bind (load from archive source) src
(declare (ignore load from archive))
(destructuring-bind (type url) source
(ecase type
(:http url))))))
(defrule load-from-archive (and archive-source
target
before-load-do before-load-do
archive-command-list archive-command-list
finally-do) finally-do)
(:lambda (archive) (:lambda (archive)
(destructuring-bind (source before commands finally) archive (destructuring-bind (source pg-db-uri before commands finally) archive
(destructuring-bind (&key host port user password dbname &allow-other-keys)
pg-db-uri
`(lambda () `(lambda ()
(let* ((archive-file (pgloader.archive:http-fetch-file ,source)) (let* ((archive-file (pgloader.archive:http-fetch-file ,source))
(*csv-root-file* (pgloader.archive:expand-archive archive-file))) (*csv-path-root* (pgloader.archive:expand-archive archive-file))
(*pgconn-host* ,host)
(*pgconn-port* ,port)
(*pgconn-user* ,user)
(*pgconn-pass* ,password))
(progn (progn
(pgsql-execute ,before :client-min-messages :error) (with-pgsql-transaction (,dbname)
(pgsql-execute ,before :client-min-messages :error))
,@(loop for command in commands ,@(loop for command in commands
collect `(funcall ,command)) collect `(funcall ,command))
(pgsql-execute ,finally :client-min-messages :error))))))) (with-pgsql-transaction (,dbname)
(pgsql-execute ,finally :client-min-messages :error)))))))))
;;; ;;;
@ -1154,7 +1203,7 @@ Here's a quick description of the format we're parsing here:
(defrule end-of-command (and ignore-whitespace #\; ignore-whitespace) (defrule end-of-command (and ignore-whitespace #\; ignore-whitespace)
(:constant :eoc)) (:constant :eoc))
(defrule command (and (or in-archive (defrule command (and (or load-from-archive
load-csv-file load-csv-file
load-dbf-file load-dbf-file
load-database load-database
@ -1258,9 +1307,10 @@ LOAD FROM http:///tapoueh.org/db.t
fields terminated by '\t'; fields terminated by '\t';
")) "))
(defun test-parsing-in-archive () (defun test-parsing-load-from-archive ()
(parse-command " (parse-command "
IN ARCHIVE http://geolite.maxmind.com/download/geoip/database/GeoLiteCity_CSV/GeoLiteCity-latest.zip LOAD FROM ARCHIVE http://pgsql.tapoueh.org/temp/foo.zip
INTO postgresql://dim@localhost:54393/dim
BEFORE LOAD DO BEFORE LOAD DO
$$ $$
@ -1284,7 +1334,7 @@ LOAD FROM http:///tapoueh.org/db.t
); );
$$ $$
LOAD CSV FROM '*/GeoLiteCity-Blocks.csv' LOAD CSV FROM FILENAME MATCHING ~/GeoLiteCity-Blocks.csv/
( (
startIpNum, endIpNum, locId startIpNum, endIpNum, locId
) )
@ -1299,7 +1349,7 @@ LOAD FROM http:///tapoueh.org/db.t
fields escaped by '\"', fields escaped by '\"',
fields terminated by '\\t' fields terminated by '\\t'
AND LOAD CSV FROM '*/GeoLiteCity-Location.csv' AND LOAD CSV FROM FILENAME MATCHING ~/GeoLiteCity-Location.csv/
( (
locId,country,region,city,postalCode, locId,country,region,city,postalCode,
latitude,longitude,metroCode,areaCode latitude,longitude,metroCode,areaCode

View File

@ -6,10 +6,15 @@
;;; syntax for transformations. ;;; syntax for transformations.
(defpackage #:pgloader.transforms (defpackage #:pgloader.transforms
(:use #:cl)) (:use #:cl)
(:export #:intern-symbol))
(in-package :pgloader.transforms) (in-package :pgloader.transforms)
(defun intern-symbol (symbol-name)
(intern (string-upcase symbol-name)
(find-package "PGLOADER.TRANSFORMS")))
(defun zero-dates-to-null (date-string) (defun zero-dates-to-null (date-string)
"MySQL accepts '0000-00-00' as a date, we want :null instead." "MySQL accepts '0000-00-00' as a date, we want :null instead."
(cond (cond