pgloader/src/parsers/command-csv.lisp
Dimitri Fontaine 7b9b8a32e7 Move sexp parsing into its own file.
After all, it's shared between the CSV command parsing and the Cast
Rules parsing. src/parsers/command-csv.lisp still contains lots of
facilities shared between the file based sources, will need another
series of splits.
2015-10-05 11:39:44 +02:00

464 lines
17 KiB
Common Lisp
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

;;;
;;; The main CSV loading command, with optional clauses
;;;
(in-package #:pgloader.parser)
#|
LOAD CSV FROM /Users/dim/dev/CL/pgloader/galaxya/yagoa/communaute_profil.csv
INTO postgresql://dim@localhost:54393/yagoa?commnaute_profil
WITH truncate,
fields optionally enclosed by '\"',
fields escaped by \"\,
fields terminated by '\t',
reset sequences;
LOAD CSV FROM '*/GeoLiteCity-Blocks.csv'
(
startIpNum, endIpNum, locId
)
INTO postgresql://dim@localhost:54393/dim?geolite.blocks
(
iprange ip4r using (ip-range startIpNum endIpNum),
locId
)
WITH truncate,
skip header = 2,
fields optionally enclosed by '\"',
fields escaped by '\"',
fields terminated by '\t';
|#
(defrule hex-char-code (and "0x" (+ (hexdigit-char-p character)))
(:lambda (hex)
(bind (((_ digits) hex))
(code-char (parse-integer (text digits) :radix 16)))))
(defrule tab (and #\\ #\t) (:constant #\Tab))
(defrule separator (and #\' (or hex-char-code tab character ) #\')
(:lambda (sep)
(bind (((_ char _) sep)) char)))
;;
;; Main CSV options (WITH ... in the command grammar)
;;
(defrule option-skip-header (and kw-skip kw-header equal-sign
(+ (digit-char-p character)))
(:lambda (osh)
(bind (((_ _ _ digits) osh))
(cons :skip-lines (parse-integer (text digits))))))
(defrule option-csv-header (and kw-csv kw-header)
(:constant (cons :csv-header t)))
(defrule option-fields-enclosed-by
(and kw-fields (? kw-optionally) kw-enclosed kw-by separator)
(:lambda (enc)
(bind (((_ _ _ _ sep) enc))
(cons :quote sep))))
(defrule option-fields-not-enclosed (and kw-fields kw-not kw-enclosed)
(:constant (cons :quote nil)))
(defrule quote-quote "double-quote" (:constant "\"\""))
(defrule backslash-quote "backslash-quote" (:constant "\\\""))
(defrule escaped-quote-name (or quote-quote backslash-quote))
(defrule escaped-quote-literal (or (and #\" #\") (and #\\ #\")) (:text t))
(defrule escaped-quote (or escaped-quote-literal
escaped-quote-name
separator))
(defrule escape-mode-quote "quote" (:constant :quote))
(defrule escape-mode-following "following" (:constant :following))
(defrule escape-mode (or escape-mode-quote escape-mode-following))
(defrule option-fields-escaped-by (and kw-fields kw-escaped kw-by escaped-quote)
(:lambda (esc)
(bind (((_ _ _ sep) esc))
(cons :escape sep))))
(defrule option-terminated-by (and kw-terminated kw-by separator)
(:lambda (term)
(bind (((_ _ sep) term))
(cons :separator sep))))
(defrule option-fields-terminated-by (and kw-fields option-terminated-by)
(:lambda (term)
(bind (((_ sep) term)) sep)))
(defrule option-lines-terminated-by (and kw-lines kw-terminated kw-by separator)
(:lambda (term)
(bind (((_ _ _ sep) term))
(cons :newline sep))))
(defrule option-keep-unquoted-blanks (and kw-keep kw-unquoted kw-blanks)
(:constant (cons :trim-blanks nil)))
(defrule option-trim-unquoted-blanks (and kw-trim kw-unquoted kw-blanks)
(:constant (cons :trim-blanks t)))
(defrule option-csv-escape-mode (and kw-csv kw-escape kw-mode escape-mode)
(:lambda (term)
(bind (((_ _ _ escape-mode) term))
(cons :escape-mode escape-mode))))
(defrule csv-option (or option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-disable-triggers
option-drop-indexes
option-skip-header
option-csv-header
option-lines-terminated-by
option-fields-not-enclosed
option-fields-enclosed-by
option-fields-escaped-by
option-fields-terminated-by
option-trim-unquoted-blanks
option-keep-unquoted-blanks
option-csv-escape-mode))
(defrule another-csv-option (and comma csv-option)
(:lambda (source)
(bind (((_ option) source)) option)))
(defrule csv-option-list (and csv-option (* another-csv-option))
(:lambda (source)
(destructuring-bind (opt1 opts) source
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule csv-options (and kw-with csv-option-list)
(:lambda (source)
(bind (((_ opts) source))
(cons :csv-options opts))))
;;
;; CSV per-field reading options
;;
(defrule single-quoted-string (and #\' (* (not #\')) #\')
(:lambda (qs)
(bind (((_ string _) qs))
(text string))))
(defrule double-quoted-string (and #\" (* (not #\")) #\")
(:lambda (qs)
(bind (((_ string _) qs))
(text string))))
(defrule quoted-string (or single-quoted-string double-quoted-string))
(defrule option-date-format (and kw-date kw-format quoted-string)
(:lambda (df)
(bind (((_ _ date-format) df))
(cons :date-format date-format))))
(defrule blanks kw-blanks (:constant :blanks))
(defrule option-null-if (and kw-null kw-if (or blanks quoted-string))
(:lambda (nullif)
(bind (((_ _ opt) nullif))
(cons :null-as opt))))
(defrule option-trim-both-whitespace (and kw-trim kw-both kw-whitespace)
(:constant (cons :trim-both t)))
(defrule option-trim-left-whitespace (and kw-trim kw-left kw-whitespace)
(:constant (cons :trim-left t)))
(defrule option-trim-right-whitespace (and kw-trim kw-right kw-whitespace)
(:constant (cons :trim-right t)))
(defrule csv-field-option (or option-terminated-by
option-date-format
option-null-if
option-trim-both-whitespace
option-trim-left-whitespace
option-trim-right-whitespace))
(defrule another-csv-field-option (and comma csv-field-option)
(:lambda (field-option)
(bind (((_ option) field-option)) option)))
(defrule open-square-bracket (and ignore-whitespace #\[ ignore-whitespace)
(:constant :open-square-bracket))
(defrule close-square-bracket (and ignore-whitespace #\] ignore-whitespace)
(:constant :close-square-bracket))
(defrule csv-field-option-list (and open-square-bracket
csv-field-option
(* another-csv-field-option)
close-square-bracket)
(:lambda (option)
(bind (((_ opt1 opts _) option))
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule csv-field-options (? csv-field-option-list))
(defrule csv-raw-field-name (and (or #\_ (alpha-char-p character))
(* (or (alpha-char-p character)
(digit-char-p character)
#\Space
#\.
#\$
#\_)))
(:text t))
(defrule csv-bare-field-name (and (or #\_ (alpha-char-p character))
(* (or (alpha-char-p character)
(digit-char-p character)
#\.
#\$
#\_)))
(:lambda (name)
(string-downcase (text name))))
(defrule csv-quoted-field-name (and #\" csv-raw-field-name #\")
(:lambda (csv-field-name)
(bind (((_ name _) csv-field-name)) name)))
(defrule csv-field-name (or csv-quoted-field-name csv-bare-field-name))
(defrule csv-source-field (and csv-field-name csv-field-options)
(:destructure (name opts)
`(,name ,@opts)))
(defrule another-csv-source-field (and comma csv-source-field)
(:lambda (source)
(bind (((_ field) source)) field)))
(defrule csv-source-fields (and csv-source-field (* another-csv-source-field))
(:lambda (source)
(destructuring-bind (field1 fields) source
(list* field1 fields))))
(defrule open-paren (and ignore-whitespace #\( ignore-whitespace)
(:constant :open-paren))
(defrule close-paren (and ignore-whitespace #\) ignore-whitespace)
(:constant :close-paren))
(defrule having-fields (and kw-having kw-fields) (:constant nil))
(defrule csv-source-field-list (and (? having-fields)
open-paren csv-source-fields close-paren)
(:lambda (source)
(bind (((_ _ field-defs _) source)) field-defs)))
;;
;; csv-target-column-list
;;
;; iprange ip4r using (ip-range startIpNum endIpNum),
;;
(defrule column-name csv-field-name) ; same rules here
(defrule column-type csv-field-name) ; again, same rules, names only
(defrule column-expression (and kw-using sexp)
(:lambda (expr)
(bind (((_ sexp) expr)) sexp)))
(defrule csv-target-column (and column-name
(? (and ignore-whitespace column-type
column-expression)))
(:lambda (col)
(bind (((name opts) col)
((_ type expr) (or opts (list nil nil nil))))
(list name type expr))))
(defrule another-csv-target-column (and comma csv-target-column)
(:lambda (source)
(bind (((_ col) source)) col)))
(defrule csv-target-columns (and csv-target-column
(* another-csv-target-column))
(:lambda (source)
(destructuring-bind (col1 cols) source
(list* col1 cols))))
(defrule target-columns (and kw-target kw-columns) (:constant nil))
(defrule csv-target-column-list (and (? target-columns)
open-paren csv-target-columns close-paren)
(:lambda (source)
(bind (((_ _ columns _) source)) columns)))
;;
;; The main command parsing
;;
(defun find-encoding-by-name-or-alias (encoding)
"charsets::*lisp-encodings* is an a-list of (NAME . ALIASES)..."
(loop :for (name . aliases) :in (list-encodings-and-aliases)
:for encoding-name := (when (or (string-equal name encoding)
(member encoding aliases :test #'string-equal))
name)
:until encoding-name
:finally (if encoding-name (return encoding-name)
(error "The encoding '~a' is unknown" encoding))))
(defrule encoding (or namestring single-quoted-string)
(:lambda (encoding)
(make-external-format (find-encoding-by-name-or-alias encoding))))
(defrule file-encoding (? (and kw-with kw-encoding encoding))
(:lambda (enc)
(if enc
(bind (((_ _ encoding) enc)) encoding)
:utf-8)))
(defrule first-filename-matching
(and (? kw-first) kw-filename kw-matching quoted-regex)
(:lambda (fm)
(bind (((_ _ _ regex) fm))
;; regex is a list with first the symbol :regex and second the regexp
;; as a string
(list* :regex :first (cdr regex)))))
(defrule all-filename-matching
(and kw-all (or kw-filenames kw-filename) kw-matching quoted-regex)
(:lambda (fm)
(bind (((_ _ _ regex) fm))
;; regex is a list with first the symbol :regex and second the regexp
;; as a string
(list* :regex :all (cdr regex)))))
(defrule in-directory (and kw-in kw-directory maybe-quoted-filename)
(:lambda (in-d)
(bind (((_ _ dir) in-d)) dir)))
(defrule filename-matching (and (or first-filename-matching
all-filename-matching)
(? in-directory))
(:lambda (filename-matching)
(bind (((matching directory) filename-matching)
(directory (or directory `(:filename ,*cwd*)))
((m-type first-or-all regex) matching)
((d-type dir) directory)
(root (uiop:directory-exists-p
(if (uiop:absolute-pathname-p dir) dir
(uiop:merge-pathnames* dir *cwd*)))))
(assert (eq m-type :regex))
(assert (eq d-type :filename))
(unless root
(error "Directory ~s does not exists."
(uiop:native-namestring dir)))
`(:regex ,first-or-all ,regex ,root))))
(defrule csv-uri (and "csv://" filename)
(:lambda (source)
(bind (((_ filename) source))
(make-instance 'csv-connection :spec filename))))
(defrule csv-file-source (or stdin
inline
http-uri
csv-uri
filename-matching
maybe-quoted-filename)
(:lambda (src)
(if (typep src 'csv-connection) src
(destructuring-bind (type &rest specs) src
(case type
(:stdin (make-instance 'csv-connection :spec src))
(:inline (make-instance 'csv-connection :spec src))
(:filename (make-instance 'csv-connection :spec src))
(:regex (make-instance 'csv-connection :spec src))
(:http (make-instance 'csv-connection :uri (first specs))))))))
(defrule get-csv-file-source-from-environment-variable (and kw-getenv name)
(:lambda (p-e-v)
(bind (((_ varname) p-e-v)
(connstring (getenv-default varname)))
(unless connstring
(error "Environment variable ~s is unset." varname))
(parse 'csv-file-source connstring))))
(defrule csv-source (and kw-load kw-csv kw-from
(or get-csv-file-source-from-environment-variable
csv-file-source))
(:lambda (src)
(bind (((_ _ _ source) src)) source)))
(defun list-symbols (expression &optional s)
"Return a list of the symbols used in EXPRESSION."
(typecase expression
(symbol (pushnew expression s))
(list (loop for e in expression for s = (list-symbols e s)
finally (return (reverse s))))
(t s)))
(defrule load-csv-file-optional-clauses (* (or csv-options
gucs
before-load
after-load))
(:lambda (clauses-list)
(alexandria:alist-plist clauses-list)))
(defrule load-csv-file-command (and csv-source
(? file-encoding) (? csv-source-field-list)
target (? csv-target-column-list)
load-csv-file-optional-clauses)
(:lambda (command)
(destructuring-bind (source encoding fields target columns clauses) command
`(,source ,encoding ,fields ,target ,columns ,@clauses))))
(defun lisp-code-for-csv-dry-run (pg-db-conn)
`(lambda ()
;; CSV connection objects are not actually implementing the generic API
;; because they support many complex options... (the file can be a
;; pattern or standard input or inline or compressed etc).
(log-message :log "DRY RUN, only checking PostgreSQL connection.")
(check-connection ,pg-db-conn)))
(defun lisp-code-for-loading-from-csv (csv-conn fields pg-db-conn
&key
(encoding :utf-8)
columns
gucs before after
((:csv-options options)))
`(lambda ()
(let* (,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :section :pre)
(expand (fetch-file ,csv-conn)))))
(progn
,(sql-code-block pg-db-conn :pre before "before load")
(let ((truncate (getf ',options :truncate))
(disable-triggers (getf ',options :disable-triggers))
(drop-indexes (getf ',options :drop-indexes))
(source
(make-instance 'pgloader.csv:copy-csv
:target-db ,pg-db-conn
:source source-db
:target ',(pgconn-table-name pg-db-conn)
:encoding ,encoding
:fields ',fields
:columns ',columns
,@(remove-batch-control-option
options :extras '(:truncate
:drop-indexes
:disable-triggers)))))
(pgloader.sources:copy-from source
:truncate truncate
:drop-indexes drop-indexes
:disable-triggers disable-triggers))
,(sql-code-block pg-db-conn :post after "after load")))))
(defrule load-csv-file load-csv-file-command
(:lambda (command)
(bind (((source encoding fields pg-db-uri columns
&key ((:csv-options options)) gucs before after) command))
(cond (*dry-run*
(lisp-code-for-csv-dry-run pg-db-uri))
(t
(lisp-code-for-loading-from-csv source fields pg-db-uri
:encoding encoding
:columns columns
:gucs gucs
:before before
:after after
:csv-options options))))))