pgloader/src/parsers/command-csv.lisp
Dimitri Fontaine fff756f95f Refactor the command parser.
Split its content into separate files, so that each is easier to
maintain, and to make it easier also to add support for new sources.
2014-11-16 22:22:04 +01:00

444 lines
15 KiB
Common Lisp
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

;;;
;;; The main CSV loading command, with optional clauses
;;;
(in-package #:pgloader.parser)
#|
LOAD CSV FROM /Users/dim/dev/CL/pgloader/galaxya/yagoa/communaute_profil.csv
INTO postgresql://dim@localhost:54393/yagoa?commnaute_profil
WITH truncate,
fields optionally enclosed by '\"',
fields escaped by \"\,
fields terminated by '\t',
reset sequences;
LOAD CSV FROM '*/GeoLiteCity-Blocks.csv'
(
startIpNum, endIpNum, locId
)
INTO postgresql://dim@localhost:54393/dim?geolite.blocks
(
iprange ip4r using (ip-range startIpNum endIpNum),
locId
)
WITH truncate,
skip header = 2,
fields optionally enclosed by '\"',
fields escaped by '\"',
fields terminated by '\t';
|#
(defrule hex-char-code (and "0x" (+ (hexdigit-char-p character)))
(:lambda (hex)
(bind (((_ digits) hex))
(code-char (parse-integer (text digits) :radix 16)))))
(defrule tab (and #\\ #\t) (:constant #\Tab))
(defrule separator (and #\' (or hex-char-code tab character ) #\')
(:lambda (sep)
(bind (((_ char _) sep)) char)))
;;
;; Main CSV options (WITH ... in the command grammar)
;;
(defrule option-skip-header (and kw-skip kw-header equal-sign
(+ (digit-char-p character)))
(:lambda (osh)
(bind (((_ _ _ digits) osh))
(cons :skip-lines (parse-integer (text digits))))))
(defrule option-fields-enclosed-by
(and kw-fields (? kw-optionally) kw-enclosed kw-by separator)
(:lambda (enc)
(bind (((_ _ _ _ sep) enc))
(cons :quote sep))))
(defrule option-fields-not-enclosed (and kw-fields kw-not kw-enclosed)
(:constant (cons :quote nil)))
(defrule quote-quote "double-quote" (:constant "\"\""))
(defrule backslash-quote "backslash-quote" (:constant "\\\""))
(defrule escaped-quote-name (or quote-quote backslash-quote))
(defrule escaped-quote-literal (or (and #\" #\") (and #\\ #\")) (:text t))
(defrule escaped-quote (or escaped-quote-literal escaped-quote-name))
(defrule option-fields-escaped-by (and kw-fields kw-escaped kw-by escaped-quote)
(:lambda (esc)
(bind (((_ _ _ sep) esc))
(cons :escape sep))))
(defrule option-terminated-by (and kw-terminated kw-by separator)
(:lambda (term)
(bind (((_ _ sep) term))
(cons :separator sep))))
(defrule option-fields-terminated-by (and kw-fields option-terminated-by)
(:lambda (term)
(bind (((_ sep) term)) sep)))
(defrule option-lines-terminated-by (and kw-lines kw-terminated kw-by separator)
(:lambda (term)
(bind (((_ _ _ sep) term))
(cons :newline sep))))
(defrule option-keep-unquoted-blanks (and kw-keep kw-unquoted kw-blanks)
(:constant (cons :trim-blanks nil)))
(defrule option-trim-unquoted-blanks (and kw-trim kw-unquoted kw-blanks)
(:constant (cons :trim-blanks t)))
(defrule csv-option (or option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-skip-header
option-lines-terminated-by
option-fields-not-enclosed
option-fields-enclosed-by
option-fields-escaped-by
option-fields-terminated-by
option-trim-unquoted-blanks
option-keep-unquoted-blanks))
(defrule another-csv-option (and comma csv-option)
(:lambda (source)
(bind (((_ option) source)) option)))
(defrule csv-option-list (and csv-option (* another-csv-option))
(:lambda (source)
(destructuring-bind (opt1 opts) source
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule csv-options (and kw-with csv-option-list)
(:lambda (source)
(bind (((_ opts) source))
(cons :csv-options opts))))
;;
;; CSV per-field reading options
;;
(defrule single-quoted-string (and #\' (* (not #\')) #\')
(:lambda (qs)
(bind (((_ string _) qs))
(text string))))
(defrule double-quoted-string (and #\" (* (not #\")) #\")
(:lambda (qs)
(bind (((_ string _) qs))
(text string))))
(defrule quoted-string (or single-quoted-string double-quoted-string))
(defrule option-date-format (and kw-date kw-format quoted-string)
(:lambda (df)
(bind (((_ _ date-format) df))
(cons :date-format date-format))))
(defrule blanks kw-blanks (:constant :blanks))
(defrule option-null-if (and kw-null kw-if (or blanks quoted-string))
(:lambda (nullif)
(bind (((_ _ opt) nullif))
(cons :null-as opt))))
(defrule option-trim-both-whitespace (and kw-trim kw-both kw-whitespace)
(:constant (cons :trim-both t)))
(defrule option-trim-left-whitespace (and kw-trim kw-left kw-whitespace)
(:constant (cons :trim-left t)))
(defrule option-trim-right-whitespace (and kw-trim kw-right kw-whitespace)
(:constant (cons :trim-right t)))
(defrule csv-field-option (or option-terminated-by
option-date-format
option-null-if
option-trim-both-whitespace
option-trim-left-whitespace
option-trim-right-whitespace))
(defrule another-csv-field-option (and comma csv-field-option)
(:lambda (field-option)
(bind (((_ option) field-option)) option)))
(defrule open-square-bracket (and ignore-whitespace #\[ ignore-whitespace)
(:constant :open-square-bracket))
(defrule close-square-bracket (and ignore-whitespace #\] ignore-whitespace)
(:constant :close-square-bracket))
(defrule csv-field-option-list (and open-square-bracket
csv-field-option
(* another-csv-field-option)
close-square-bracket)
(:lambda (option)
(bind (((_ opt1 opts _) option))
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule csv-field-options (? csv-field-option-list))
(defrule csv-raw-field-name (and (or #\_ (alpha-char-p character))
(* (or (alpha-char-p character)
(digit-char-p character)
#\_)))
(:text t))
(defrule csv-bare-field-name csv-raw-field-name
(:lambda (name)
(string-downcase name)))
(defrule csv-quoted-field-name (and #\" csv-raw-field-name #\")
(:lambda (csv-field-name)
(bind (((_ name _) csv-field-name)) name)))
(defrule csv-field-name (or csv-quoted-field-name csv-bare-field-name))
(defrule csv-source-field (and csv-field-name csv-field-options)
(:destructure (name opts)
`(,name ,@opts)))
(defrule another-csv-source-field (and comma csv-source-field)
(:lambda (source)
(bind (((_ field) source)) field)))
(defrule csv-source-fields (and csv-source-field (* another-csv-source-field))
(:lambda (source)
(destructuring-bind (field1 fields) source
(list* field1 fields))))
(defrule open-paren (and ignore-whitespace #\( ignore-whitespace)
(:constant :open-paren))
(defrule close-paren (and ignore-whitespace #\) ignore-whitespace)
(:constant :close-paren))
(defrule having-fields (and kw-having kw-fields) (:constant nil))
(defrule csv-source-field-list (and (? having-fields)
open-paren csv-source-fields close-paren)
(:lambda (source)
(bind (((_ _ field-defs _) source)) field-defs)))
;;
;; csv-target-column-list
;;
;; iprange ip4r using (ip-range startIpNum endIpNum),
;;
(defrule column-name csv-field-name) ; same rules here
(defrule column-type csv-field-name) ; again, same rules, names only
(defun not-doublequote (char)
(not (eql #\" char)))
(defun symbol-character-p (character)
(not (member character '(#\Space #\( #\)))))
(defun symbol-first-character-p (character)
(and (symbol-character-p character)
(not (member character '(#\+ #\-)))))
(defrule sexp-symbol (and (symbol-first-character-p character)
(* (symbol-character-p character)))
(:lambda (schars)
(pgloader.transforms:intern-symbol (text schars))))
(defrule sexp-string-char (or (not-doublequote character) (and #\\ #\")))
(defrule sexp-string (and #\" (* sexp-string-char) #\")
(:destructure (q1 string q2)
(declare (ignore q1 q2))
(text string)))
(defrule sexp-integer (+ (or "0" "1" "2" "3" "4" "5" "6" "7" "8" "9"))
(:lambda (list)
(parse-integer (text list) :radix 10)))
(defrule sexp-list (and open-paren sexp (* sexp) close-paren)
(:destructure (open car cdr close)
(declare (ignore open close))
(cons car cdr)))
(defrule sexp-atom (and ignore-whitespace
(or sexp-string sexp-integer sexp-symbol))
(:lambda (atom)
(bind (((_ a) atom)) a)))
(defrule sexp (or sexp-atom sexp-list))
(defrule column-expression (and kw-using sexp)
(:lambda (expr)
(bind (((_ sexp) expr)) sexp)))
(defrule csv-target-column (and column-name
(? (and ignore-whitespace column-type
column-expression)))
(:lambda (col)
(bind (((name opts) col)
((_ type expr) (or opts (list nil nil nil))))
(list name type expr))))
(defrule another-csv-target-column (and comma csv-target-column)
(:lambda (source)
(bind (((_ col) source)) col)))
(defrule csv-target-columns (and csv-target-column
(* another-csv-target-column))
(:lambda (source)
(destructuring-bind (col1 cols) source
(list* col1 cols))))
(defrule target-columns (and kw-target kw-columns) (:constant nil))
(defrule csv-target-column-list (and (? target-columns)
open-paren csv-target-columns close-paren)
(:lambda (source)
(bind (((_ _ columns _) source)) columns)))
;;
;; The main command parsing
;;
(defun find-encoding-by-name-or-alias (encoding)
"charsets::*lisp-encodings* is an a-list of (NAME . ALIASES)..."
(loop :for (name . aliases) :in (list-encodings-and-aliases)
:for encoding-name := (when (or (string-equal name encoding)
(member encoding aliases :test #'string-equal))
name)
:until encoding-name
:finally (if encoding-name (return encoding-name)
(error "The encoding '~a' is unknown" encoding))))
(defrule encoding (or namestring single-quoted-string)
(:lambda (encoding)
(make-external-format (find-encoding-by-name-or-alias encoding))))
(defrule file-encoding (? (and kw-with kw-encoding encoding))
(:lambda (enc)
(if enc
(bind (((_ _ encoding) enc)) encoding)
:utf-8)))
(defrule first-filename-matching
(and (? kw-first) kw-filename kw-matching quoted-regex)
(:lambda (fm)
(bind (((_ _ _ regex) fm))
;; regex is a list with first the symbol :regex and second the regexp
;; as a string
(list* :regex :first (cdr regex)))))
(defrule all-filename-matching
(and kw-all (or kw-filenames kw-filename) kw-matching quoted-regex)
(:lambda (fm)
(bind (((_ _ _ regex) fm))
;; regex is a list with first the symbol :regex and second the regexp
;; as a string
(list* :regex :all (cdr regex)))))
(defrule in-directory (and kw-in kw-directory maybe-quoted-filename)
(:lambda (in-d)
(bind (((_ _ dir) in-d)) dir)))
(defrule filename-matching (and (or first-filename-matching
all-filename-matching)
(? in-directory))
(:lambda (filename-matching)
(bind (((matching directory) filename-matching)
(directory (or directory `(:filename ,*cwd*)))
((m-type first-or-all regex) matching)
((d-type dir) directory)
(root (uiop:directory-exists-p
(if (uiop:absolute-pathname-p dir) dir
(uiop:merge-pathnames* dir *cwd*)))))
(assert (eq m-type :regex))
(assert (eq d-type :filename))
(unless root
(error "Directory ~s does not exists."
(uiop:native-namestring dir)))
`(:regex ,first-or-all ,regex ,root))))
(defrule csv-file-source (or stdin
inline
filename-matching
maybe-quoted-filename))
(defrule get-csv-file-source-from-environment-variable (and kw-getenv name)
(:lambda (p-e-v)
(bind (((_ varname) p-e-v)
(connstring (getenv-default varname)))
(unless connstring
(error "Environment variable ~s is unset." varname))
(parse 'csv-file-source connstring))))
(defrule csv-source (and kw-load kw-csv kw-from
(or get-csv-file-source-from-environment-variable
csv-file-source))
(:lambda (src)
(bind (((_ _ _ source) src)
;; source is (:filename #P"pathname/here")
((type &rest _) source))
(ecase type
(:stdin source)
(:inline source)
(:filename source)
(:regex source)))))
(defun list-symbols (expression &optional s)
"Return a list of the symbols used in EXPRESSION."
(typecase expression
(symbol (pushnew expression s))
(list (loop for e in expression for s = (list-symbols e s)
finally (return (reverse s))))
(t s)))
(defrule load-csv-file-optional-clauses (* (or csv-options
gucs
before-load
after-load))
(:lambda (clauses-list)
(alexandria:alist-plist clauses-list)))
(defrule load-csv-file-command (and csv-source
(? file-encoding) (? csv-source-field-list)
target (? csv-target-column-list)
load-csv-file-optional-clauses)
(:lambda (command)
(destructuring-bind (source encoding fields target columns clauses) command
`(,source ,encoding ,fields ,target ,columns ,@clauses))))
(defrule load-csv-file load-csv-file-command
(:lambda (command)
(bind (((source encoding fields pg-db-uri columns
&key ((:csv-options options)) gucs before after) command)
((&key dbname table-name &allow-other-keys) pg-db-uri))
`(lambda ()
(let* ((state-before ,(when before `(pgloader.utils:make-pgstate)))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-after ,(when after `(pgloader.utils:make-pgstate)))
,@(pgsql-connection-bindings pg-db-uri gucs)
,@(batch-control-bindings options))
(progn
,(sql-code-block dbname 'state-before before "before load")
(let ((truncate (getf ',options :truncate))
(source
(make-instance 'pgloader.csv:copy-csv
:target-db ,dbname
:source ',source
:target ,table-name
:encoding ,encoding
:fields ',fields
:columns ',columns
,@(remove-batch-control-option
options :extras '(:truncate)))))
(pgloader.sources:copy-from source :truncate truncate))
,(sql-code-block dbname 'state-after after "after load")
;; reporting
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after))))))))