From 6e3767bb371c96c30359a76d64cccd8ccbc43a62 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Wed, 25 Sep 2013 00:01:46 +0200 Subject: [PATCH] Implement generic field to columns projection with support for user code. And use that new facility in the Archive mode of operations, which is a practical unit of testing and milestone to reach. We have enough code to actually compile and try running the parser's output. --- archive.lisp | 44 ++++++++++++- csv.lisp | 141 ++++++++++++++++++++++++++++------------ package.lisp | 17 ++--- parser.lisp | 168 +++++++++++++++++++++++++++++++----------------- transforms.lisp | 7 +- 5 files changed, 265 insertions(+), 112 deletions(-) diff --git a/archive.lisp b/archive.lisp index 4f36c78..314d6f7 100644 --- a/archive.lisp +++ b/archive.lisp @@ -13,7 +13,35 @@ (defun http-fetch-file (url &key (tmpdir *default-tmpdir*)) "Download a file from URL into TMPDIR." (ensure-directories-exist tmpdir) - (drakma:http-request url :force-binary t)) + (let ((archive-filename (make-pathname :directory (namestring tmpdir) + :name (pathname-name url) + :type (pathname-type url)))) + (multiple-value-bind (http-stream + status-code + headers + uri + stream + should-close + status) + (drakma:http-request url :force-binary t :want-stream t) + ;; TODO: check the status-code + (declare (ignore status-code uri stream status)) + (let* ((source-stream (flexi-streams:flexi-stream-stream http-stream)) + (content-length + (parse-integer (cdr (assoc :content-length headers))))) + (with-open-file (archive-stream archive-filename + :direction :output + :element-type '(unsigned-byte 8) + :if-exists :supersede + :if-does-not-exist :create) + (let ((seq (make-array content-length + :element-type '(unsigned-byte 8) + :fill-pointer t))) + (setf (fill-pointer seq) (read-sequence seq source-stream)) + (write-sequence seq archive-stream))) + (when should-close (close source-stream)))) + ;; return the pathname where we just downloaded the file + archive-filename)) (defun expand-archive (archive-file &key (tmpdir *default-tmpdir*)) "Expand given ARCHIVE-FILE in TMPDIR/(pathname-name ARCHIVE-FILE). Return @@ -25,8 +53,20 @@ (fad:pathname-as-directory (merge-pathnames archive-name tmpdir)))) (ensure-directories-exist expand-directory) (ecase archive-type - (:zip (zip:unzip archive-file expand-directory))))) + (:zip (zip:unzip archive-file expand-directory))) + ;; return the pathname where we did expand the archive + expand-directory)) +(defun get-matching-filenames (directory regex) + "Apply given REGEXP to the DIRECTORY contents and return the list of + matching files." + (let ((matches nil) + (start (length (namestring directory)))) + (flet ((push-matches (pathname) + (when (cl-ppcre:scan regex (namestring pathname) :start start) + (push pathname matches)))) + (fad:walk-directory directory #'push-matches)) + matches)) ;;; diff --git a/csv.lisp b/csv.lisp index 972af7e..c9cbb5e 100644 --- a/csv.lisp +++ b/csv.lisp @@ -12,13 +12,67 @@ :name table-name :type "csv")) +(defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*)) + "PATHNAME-OR-REGEX is expected to be either (:regexp expression) + or (:filename pathname). In the first case, this fonction check if the + pathname is absolute or relative and returns an absolute pathname given + current working directory of ROOT. + + In the second case, walk the ROOT directory and return the first pathname + that matches the regex. TODO: consider signaling a condition when we have + more than one match." + (destructuring-bind (type part) pathname-or-regex + (ecase type + (:regex (first (pgloader.archive:get-matching-filenames root part))) + (:filename (if (fad:pathname-absolute-p part) part + (merge-pathnames part root)))))) + +;;; +;;; Project fields into columns +;;; +(defun project-fields (&key fields columns null-as) + "The simplest projection happens when both FIELDS and COLS are nil: in + this case the projection is an identity, we simply return what we got -- + still with some basic processing. + + Other forms of projections consist of forming columns with the result of + applying a transformation function. In that case a cols entry is a list + of '(colname type expression), the expression being the (already + compiled) function to use here." + + (let ((null-as (if (eq null-as :blanks) + (lambda (col) + (every (lambda (char) (char= char #\Space)) col)) + `(lambda (col) + (if (string= ,null-as col) nil col))))) + + (if (or (null fields) (null columns)) + `(lambda (row) + (mapcar ,null-as row)) + + (let* ((args + (mapcar + (lambda (field) + (pgloader.transforms:intern-symbol (car field))) fields)) + (newrow + (loop for (name type fn) in columns + collect (or fn + (let ((binding + (pgloader.transforms:intern-symbol name))) + `(funcall ,null-as ,binding)))))) + `(lambda (row) + (destructuring-bind (,@args) row + (list ,@newrow))))))) + ;;; ;;; Read a file format in CSV format, and call given function on each line. ;;; (defun map-rows (table-name filename &key process-row-fn - (skip-first-p nil) + fields + columns + (skip-lines nil) (separator #\Tab) (quote cl-csv:*quote*) (escape cl-csv:*quote-escape*) @@ -41,16 +95,23 @@ Finally returns how many rows where read and processed." (let* ((read 0) (reformat-then-process - (lambda (row) - (incf read) - (let ((row-with-nils - (mapcar (lambda (x) (if (string= null-as x) nil x)) row))) - (funcall process-row-fn row-with-nils))))) + (compile nil + (lambda (row) + (incf read) + (let* ((processing (project-fields :fields fields + :columns columns + :null-as null-as)) + (processed-row (funcall processing row))) + (funcall process-row-fn processed-row)))))) + + ;; we handle skipping more than one line here, as cl-csv only knows + ;; about skipping the first line + (when (and skip-lines (< 0 skip-lines)) + (loop repeat skip-lines do (read-line input nil nil))) (handler-case (cl-csv:read-csv input :row-fn reformat-then-process - :skip-first-p skip-first-p :separator separator :quote quote :escape escape) @@ -63,7 +124,9 @@ Finally returns how many rows where read and processed." (defun copy-to-queue (table-name filename dataq &key - (skip-first-p nil) + fields + columns + skip-lines (separator #\Tab) (quote cl-csv:*quote*) (escape cl-csv:*quote-escape*) @@ -71,39 +134,33 @@ Finally returns how many rows where read and processed." "Copy data from CSV FILENAME into lprallel.queue DATAQ" (let ((read (pgloader.queue:map-push-queue dataq #'map-rows table-name filename - :skip-first-p skip-first-p + :fields fields + :columns columns + :skip-lines skip-lines :separator separator :quote quote :escape escape :null-as null-as))) (pgstate-incf *state* table-name :read read))) -(defun copy-from-file (dbname table-name filename +(defun copy-from-file (dbname table-name filename-or-regex &key + fields + columns transforms (truncate t) - (skip-first-p nil) + skip-lines (separator #\Tab) (quote cl-csv:*quote*) (escape cl-csv:*quote-escape*) (null-as "\\N")) "Copy data from CSV file FILENAME into PostgreSQL DBNAME.TABLE-NAME" - (let* ((report-header (null *state*)) - (*state* (or *state* (pgloader.utils:make-pgstate))) - (lp:*kernel* - (lp:make-kernel 2 :bindings - `((*pgconn-host* . ,*pgconn-host*) - (*pgconn-port* . ,*pgconn-port*) - (*pgconn-user* . ,*pgconn-user*) - (*pgconn-pass* . ,*pgconn-pass*) - (*pg-settings* . ',*pg-settings*) - (*myconn-host* . ,*myconn-host*) - (*myconn-port* . ,*myconn-port*) - (*myconn-user* . ,*myconn-user*) - (*myconn-pass* . ,*myconn-pass*) - (*state* . ,*state*)))) - (channel (lp:make-channel)) - (dataq (lq:make-queue :fixed-capacity 4096))) + (let* ((report-header (null *state*)) + (*state* (or *state* (pgloader.utils:make-pgstate))) + (lp:*kernel* (make-kernel 2)) + (channel (lp:make-channel)) + (dataq (lq:make-queue :fixed-capacity 4096)) + (filename (get-absolute-pathname filename-or-regex))) ;; statistics (when report-header (report-header)) @@ -112,23 +169,23 @@ Finally returns how many rows where read and processed." (multiple-value-bind (res secs) (timing - (lp:submit-task channel (lambda () - ;; this function update :read stats - (copy-to-queue table-name filename dataq - :skip-first-p skip-first-p - :separator separator - :quote quote - :escape escape - :null-as null-as))) + (lp:submit-task channel + ;; this function update :read stats + #'copy-to-queue table-name filename dataq + :fields fields + :columns columns + :skip-lines skip-lines + :separator separator + :quote quote + :escape escape + :null-as null-as) ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task - channel - (lambda () - ;; this function update :rows stats - (pgloader.pgsql:copy-from-queue dbname table-name dataq - :truncate truncate - :transforms transforms))) + (lp:submit-task channel + ;; this function update :rows stats + #'pgloader.pgsql:copy-from-queue dbname table-name dataq + :truncate truncate + :transforms transforms) ;; now wait until both the tasks are over (loop for tasks below 2 do (lp:receive-result channel)) diff --git a/package.lisp b/package.lisp index d4b579f..bfbd1bf 100644 --- a/package.lisp +++ b/package.lisp @@ -37,14 +37,6 @@ #:camelCase-to-colname #:make-kernel)) -(defpackage #:pgloader.parser - (:use #:cl #:esrap #:pgloader.params) - (:import-from #:pgloader.pgsql - #:with-pgsql-transaction - #:pgsql-execute) - (:export #:parse-command - #:run-command)) - (defpackage #:pgloader.queue (:use #:cl) (:export #:map-pop-queue @@ -65,6 +57,14 @@ #:get-date-columns #:format-row)) +(defpackage #:pgloader.parser + (:use #:cl #:esrap #:pgloader.params) + (:import-from #:pgloader.pgsql + #:with-pgsql-transaction + #:pgsql-execute) + (:export #:parse-command + #:run-command)) + ;; ;; Specific source handling @@ -97,6 +97,7 @@ (:export #:*default-tmpdir* #:http-fetch-file #:expand-archive + #:get-matching-filenames #:import-csv-from-zip)) (defpackage #:pgloader.syslog diff --git a/parser.lisp b/parser.lisp index 1d964df..052eadb 100644 --- a/parser.lisp +++ b/parser.lisp @@ -172,11 +172,63 @@ Here's a quick description of the format we're parsing here: (def-keyword-rule "before") (def-keyword-rule "finally") (def-keyword-rule "and") - (def-keyword-rule "do")) + (def-keyword-rule "do") + (def-keyword-rule "filename") + (def-keyword-rule "matching")) (defrule kw-auto-increment (and "auto_increment" (* (or #\Tab #\Space))) (:constant :auto-increment)) + +;;; +;;; Regular Expression support, quoted as-you-like +;;; +(defun process-quoted-regex (pr) + "Helper function to process different kinds of quotes for regexps" + (destructuring-bind (open regex close) pr + (declare (ignore open close)) + `(:regex ,(text regex)))) + +(defrule single-quoted-regex (and #\' (+ (not #\')) #\') + (:function process-quoted-regex)) + +(defrule double-quoted-regex (and #\" (+ (not #\")) #\") + (:function process-quoted-regex)) + +(defrule parens-quoted-regex (and #\( (+ (not #\))) #\)) + (:function process-quoted-regex)) + +(defrule braces-quoted-regex (and #\{ (+ (not #\})) #\}) + (:function process-quoted-regex)) + +(defrule chevron-quoted-regex (and #\< (+ (not #\>)) #\>) + (:function process-quoted-regex)) + +(defrule brackets-quoted-regex (and #\[ (+ (not #\])) #\]) + (:function process-quoted-regex)) + +(defrule slash-quoted-regex (and #\/ (+ (not #\/)) #\/) + (:function process-quoted-regex)) + +(defrule pipe-quoted-regex (and #\| (+ (not #\|)) #\|) + (:function process-quoted-regex)) + +(defrule sharp-quoted-regex (and #\# (+ (not #\#)) #\#) + (:function process-quoted-regex)) + +(defrule quoted-regex (and "~" (or single-quoted-regex + double-quoted-regex + parens-quoted-regex + braces-quoted-regex + chevron-quoted-regex + brackets-quoted-regex + slash-quoted-regex + pipe-quoted-regex + sharp-quoted-regex)) + (:lambda (qr) + (destructuring-bind (tilde regex) qr + (declare (ignore tilde)) + regex))) ;;; @@ -860,11 +912,11 @@ Here's a quick description of the format we're parsing here: (declare (ignore fields )) sep))) -(defrule csv-option (and (or option-truncate - option-skip-header - option-fields-enclosed-by - option-fields-escaped-by - option-fields-terminated-by))) +(defrule csv-option (or option-truncate + option-skip-header + option-fields-enclosed-by + option-fields-escaped-by + option-fields-terminated-by)) (defrule another-csv-option (and #\, ignore-whitespace csv-option) (:lambda (source) @@ -893,7 +945,7 @@ Here's a quick description of the format we're parsing here: (cons :date-format date-format)))) (defrule option-null-if-blanks (and kw-null kw-if kw-blanks) - (:constant (cons :null-if-blanks t))) + (:constant (cons :null-as :blanks))) (defrule csv-field-option (or option-terminated-by option-date-format @@ -950,13 +1002,9 @@ Here's a quick description of the format we're parsing here: (or (alphanumericp character) (member character '(#\_ #\-)))) -(defun intern-transforms-symbol (symbol-name) - (intern (string-upcase symbol-name) - (find-package "PGLOADER.TRANSFORMS"))) - (defrule sexp-symbol (+ (symbol-character-p character)) (:lambda (schars) - (intern-transforms-symbol (text schars)))) + (pgloader.transforms:intern-symbol (text schars)))) (defrule sexp-string-char (or (not-doublequote character) (and #\\ #\"))) @@ -1020,14 +1068,23 @@ Here's a quick description of the format we're parsing here: ;; ;; The main command parsing ;; -(defrule csv-source (and kw-load kw-csv kw-from maybe-quoted-filename) +(defrule filename-matching (and kw-filename kw-matching quoted-regex) + (:lambda (fm) + (destructuring-bind (filename matching regex) fm + (declare (ignore filename matching)) + regex))) + +(defrule filename-or-regex (or filename-matching maybe-quoted-filename)) + +(defrule csv-source (and kw-load kw-csv kw-from filename-or-regex) (:lambda (src) (destructuring-bind (load csv from source) src (declare (ignore load csv from)) ;; source is (:filename #P"pathname/here") (destructuring-bind (type uri) source (ecase type - (:filename uri)))))) + (:filename source) + (:regex source)))))) (defun list-symbols (expression &optional s) "Return a list of the symbols used in EXPRESSION." @@ -1037,22 +1094,6 @@ Here's a quick description of the format we're parsing here: finally (return (reverse s)))) (t s))) -(defun col-expr-to-lambda (fields cols) - "Transform expression into proper lambda definitions." - (loop - with args = (mapcar - (lambda (field) (intern-transforms-symbol (car field))) - fields) - for (name type expression) in cols - for fun = (when expression - (let ((args-not-used - (set-difference args (list-symbols expression)))) - (compile nil `(lambda (,@args) - ;; avoid warnings when compiling the command - (declare (ignore ,@args-not-used)) - ,expression)))) - collect (list name type fun))) - (defrule load-csv-file (and csv-source (? csv-source-field-list) target (? csv-target-column-list) csv-options) @@ -1068,10 +1109,9 @@ Here's a quick description of the format we're parsing here: (*pgconn-pass* ,password)) (pgloader.csv:copy-from-file ,dbname ,table-name - ,source + ',source :fields ',fields - :cols ',(funcall #'col-expr-to-lambda - fields columns) + :columns ',columns ,@options))))))) @@ -1106,14 +1146,6 @@ Here's a quick description of the format we're parsing here: (declare (ignore finally do)) quoted))) -(defrule archive-source (and kw-in kw-archive http-uri) - (:lambda (src) - (destructuring-bind (in archive source) src - (declare (ignore in archive)) - (destructuring-bind (type url) source - (ecase type - (:http url)))))) - (defrule archive-command (or load-csv-file load-dbf-file)) @@ -1128,20 +1160,37 @@ Here's a quick description of the format we're parsing here: (destructuring-bind (col1 cols) source (list* col1 cols)))) -(defrule in-archive (and archive-source - before-load-do - archive-command-list - finally-do) +(defrule archive-source (and kw-load kw-from kw-archive http-uri) + (:lambda (src) + (destructuring-bind (load from archive source) src + (declare (ignore load from archive)) + (destructuring-bind (type url) source + (ecase type + (:http url)))))) + +(defrule load-from-archive (and archive-source + target + before-load-do + archive-command-list + finally-do) (:lambda (archive) - (destructuring-bind (source before commands finally) archive - `(lambda () - (let* ((archive-file (pgloader.archive:http-fetch-file ,source)) - (*csv-root-file* (pgloader.archive:expand-archive archive-file))) - (progn - (pgsql-execute ,before :client-min-messages :error) - ,@(loop for command in commands - collect `(funcall ,command)) - (pgsql-execute ,finally :client-min-messages :error))))))) + (destructuring-bind (source pg-db-uri before commands finally) archive + (destructuring-bind (&key host port user password dbname &allow-other-keys) + pg-db-uri + `(lambda () + (let* ((archive-file (pgloader.archive:http-fetch-file ,source)) + (*csv-path-root* (pgloader.archive:expand-archive archive-file)) + (*pgconn-host* ,host) + (*pgconn-port* ,port) + (*pgconn-user* ,user) + (*pgconn-pass* ,password)) + (progn + (with-pgsql-transaction (,dbname) + (pgsql-execute ,before :client-min-messages :error)) + ,@(loop for command in commands + collect `(funcall ,command)) + (with-pgsql-transaction (,dbname) + (pgsql-execute ,finally :client-min-messages :error))))))))) ;;; @@ -1154,7 +1203,7 @@ Here's a quick description of the format we're parsing here: (defrule end-of-command (and ignore-whitespace #\; ignore-whitespace) (:constant :eoc)) -(defrule command (and (or in-archive +(defrule command (and (or load-from-archive load-csv-file load-dbf-file load-database @@ -1258,9 +1307,10 @@ LOAD FROM http:///tapoueh.org/db.t fields terminated by '\t'; ")) -(defun test-parsing-in-archive () +(defun test-parsing-load-from-archive () (parse-command " - IN ARCHIVE http://geolite.maxmind.com/download/geoip/database/GeoLiteCity_CSV/GeoLiteCity-latest.zip + LOAD FROM ARCHIVE http://pgsql.tapoueh.org/temp/foo.zip + INTO postgresql://dim@localhost:54393/dim BEFORE LOAD DO $$ @@ -1284,7 +1334,7 @@ LOAD FROM http:///tapoueh.org/db.t ); $$ - LOAD CSV FROM '*/GeoLiteCity-Blocks.csv' + LOAD CSV FROM FILENAME MATCHING ~/GeoLiteCity-Blocks.csv/ ( startIpNum, endIpNum, locId ) @@ -1299,7 +1349,7 @@ LOAD FROM http:///tapoueh.org/db.t fields escaped by '\"', fields terminated by '\\t' - AND LOAD CSV FROM '*/GeoLiteCity-Location.csv' + AND LOAD CSV FROM FILENAME MATCHING ~/GeoLiteCity-Location.csv/ ( locId,country,region,city,postalCode, latitude,longitude,metroCode,areaCode diff --git a/transforms.lisp b/transforms.lisp index f3e3772..a07fdf5 100644 --- a/transforms.lisp +++ b/transforms.lisp @@ -6,10 +6,15 @@ ;;; syntax for transformations. (defpackage #:pgloader.transforms - (:use #:cl)) + (:use #:cl) + (:export #:intern-symbol)) (in-package :pgloader.transforms) +(defun intern-symbol (symbol-name) + (intern (string-upcase symbol-name) + (find-package "PGLOADER.TRANSFORMS"))) + (defun zero-dates-to-null (date-string) "MySQL accepts '0000-00-00' as a date, we want :null instead." (cond