diff --git a/src/package.lisp b/src/package.lisp index 821c416..90a375f 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -119,7 +119,8 @@ #:filter-column-list #:get-pathname #:get-absolute-pathname - #:project-fields)) + #:project-fields + #:reformat-then-process)) (defpackage #:pgloader.csv (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.sources) diff --git a/src/parser.lisp b/src/parser.lisp index 6279084..d35b10b 100644 --- a/src/parser.lisp +++ b/src/parser.lisp @@ -143,7 +143,10 @@ (def-keyword-rule "and") (def-keyword-rule "do") (def-keyword-rule "filename") - (def-keyword-rule "matching")) + (def-keyword-rule "filenames") + (def-keyword-rule "matching") + (def-keyword-rule "first") + (def-keyword-rule "all")) (defrule kw-auto-increment (and "auto_increment" (* (or #\Tab #\Space))) (:constant :auto-increment)) @@ -1509,11 +1512,25 @@ load database encoding) :utf-8))) -(defrule filename-matching (and kw-filename kw-matching quoted-regex) +(defrule first-filename-matching + (and (? kw-first) kw-filename kw-matching quoted-regex) (:lambda (fm) - (destructuring-bind (filename matching regex) fm - (declare (ignore filename matching)) - regex))) + (destructuring-bind (first filename matching regex) fm + (declare (ignore first filename matching)) + ;; regex is a list with first the symbol :regex and second the regexp + ;; as a string + (list* :regex :first (cdr regex))))) + +(defrule all-filename-matching + (and kw-all (or kw-filenames kw-filename) kw-matching quoted-regex) + (:lambda (fm) + (destructuring-bind (all filename matching regex) fm + (declare (ignore all filename matching)) + ;; regex is a list with first the symbol :regex and second the regexp + ;; as a string + (list* :regex :all (cdr regex))))) + +(defrule filename-matching (or first-filename-matching all-filename-matching)) (defrule csv-file-source (or stdin inline @@ -1525,8 +1542,8 @@ load database (destructuring-bind (load csv from source) src (declare (ignore load csv from)) ;; source is (:filename #P"pathname/here") - (destructuring-bind (type uri) source - (declare (ignore uri)) + (destructuring-bind (type &rest data) source + (declare (ignore data)) (ecase type (:stdin source) (:inline source) @@ -1668,8 +1685,8 @@ load database (destructuring-bind (load fixed from source) src (declare (ignore load fixed from)) ;; source is (:filename #P"pathname/here") - (destructuring-bind (type uri) source - (declare (ignore uri)) + (destructuring-bind (type &rest data) source + (declare (ignore data)) (ecase type (:stdin source) (:inline source) @@ -1752,12 +1769,14 @@ load database source))) (defrule load-archive (and archive-source - target + (? target) (? before-load-do) archive-command-list (? finally-do)) (:lambda (archive) (destructuring-bind (source pg-db-uri before commands finally) archive + (when (and (or before finally) (null pg-db-uri)) + (error "When using a BEFORE LOAD DO or a FINALLY block, you must provide an archive level target database connection.")) (destructuring-bind (&key host port user password dbname &allow-other-keys) pg-db-uri `(lambda () diff --git a/src/sources/csv.lisp b/src/sources/csv.lisp index 982e004..fcdf7a5 100644 --- a/src/sources/csv.lisp +++ b/src/sources/csv.lisp @@ -8,7 +8,9 @@ ;;; Implementing the pgloader source API ;;; (defclass copy-csv (copy) - ((encoding :accessor encoding ; file encoding + ((source-type :accessor source-type ; one of :inline, :stdin, :regex + :initarg :source-type) ; or :filename + (encoding :accessor encoding ; file encoding :initarg :encoding) ; (skip-lines :accessor skip-lines ; CSV headers :initarg :skip-lines ; @@ -31,7 +33,8 @@ "Compute the real source definition from the given source parameter, and set the transforms function list as needed too." (let ((source (slot-value csv 'source))) - (setf (slot-value csv 'source) (get-absolute-pathname source))) + (setf (slot-value csv 'source-type) (car source)) + (setf (slot-value csv 'source) (get-absolute-pathname source))) (let ((transforms (when (slot-boundp csv 'transforms) (slot-value csv 'transforms))) @@ -58,59 +61,52 @@ data. That's used to handle the INLINE data loading. Finally returns how many rows where read and processed." - (let* ((filespec (source csv)) - (filename (if (consp filespec) (car filespec) filespec))) - (with-open-file - ;; we just ignore files that don't exist - (input filename - :direction :input - :external-format (encoding csv) - :if-does-not-exist nil) - (when input - ;; first go to given inline position when filename is a consp - (when (consp filespec) - (loop repeat (cdr filespec) do (read-char input))) + (let ((filenames (case (source-type csv) + (:inline (list (car (source csv)))) + (:regex (source csv)) + (t (list (source csv)))))) + (loop for filename in filenames + do + (with-open-file + ;; we just ignore files that don't exist + (input filename + :direction :input + :external-format (encoding csv) + :if-does-not-exist nil) + (when input + (log-message :info "COPY FROM ~s" filename) - ;; we handle skipping more than one line here, as cl-csv only knows - ;; about skipping the first line - (loop repeat (skip-lines csv) do (read-line input nil nil)) + ;; first go to given inline position when filename is :inline + (when (eq (source-type csv) :inline) + (file-position input (cdr (source csv)))) - ;; read in the text file, split it into columns, process NULL columns - ;; the way postmodern expects them, and call PROCESS-ROW-FN on them - (let* ((read 0) - (projection (project-fields :fields (fields csv) - :columns (columns csv))) - (reformat-then-process - (lambda (row) - (incf read) - (let ((projected-row - (handler-case - (funcall projection row) - (condition (e) - (pgstate-incf *state* (target csv) :errs 1) - (log-message :error - "Could not read line ~d: ~a" read e))))) - (when projected-row - (funcall process-row-fn projected-row)))))) + ;; we handle skipping more than one line here, as cl-csv only knows + ;; about skipping the first line + (loop repeat (skip-lines csv) do (read-line input nil nil)) - (handler-case - (cl-csv:read-csv input - :row-fn (compile nil reformat-then-process) - :separator (csv-separator csv) - :quote (csv-quote csv) - :escape (csv-escape csv) - :trim-blanks (csv-trim-blanks csv)) - ((or cl-csv:csv-parse-error type-error) (condition) - (progn - (log-message :error "~a" condition) - (pgstate-setf *state* (target csv) :errs -1)))) - ;; return how many rows we did read - read))))) + ;; read in the text file, split it into columns, process NULL + ;; columns the way postmodern expects them, and call + ;; PROCESS-ROW-FN on them + (let ((reformat-then-process + (reformat-then-process :fields (fields csv) + :columns (columns csv) + :target (target csv) + :process-row-fn process-row-fn))) + (handler-case + (cl-csv:read-csv input + :row-fn (compile nil reformat-then-process) + :separator (csv-separator csv) + :quote (csv-quote csv) + :escape (csv-escape csv) + :trim-blanks (csv-trim-blanks csv)) + ((or cl-csv:csv-parse-error type-error) (condition) + (progn + (log-message :error "~a" condition) + (pgstate-setf *state* (target csv) :errs -1)))))))))) (defmethod copy-to-queue ((csv copy-csv) dataq) "Copy data from given CSV definition into lparallel.queue DATAQ" - (let ((read (pgloader.queue:map-push-queue dataq #'map-rows csv))) - (pgstate-incf *state* (target csv) :read read))) + (pgloader.queue:map-push-queue dataq #'map-rows csv)) (defmethod copy-from ((csv copy-csv) &key truncate) "Copy data from given CSV file definition into its PostgreSQL target table." diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index b27b84c..d5a6363 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -5,7 +5,9 @@ (in-package :pgloader.fixed) (defclass copy-fixed (copy) - ((encoding :accessor encoding ; file encoding + ((source-type :accessor source-type ; one of :inline, :stdin, :regex + :initarg :source-type) ; or :filename + (encoding :accessor encoding ; file encoding :initarg :encoding) ; (skip-lines :accessor skip-lines ; CSV headers :initarg :skip-lines ; @@ -16,7 +18,8 @@ "Compute the real source definition from the given source parameter, and set the transforms function list as needed too." (let ((source (slot-value fixed 'source))) - (setf (slot-value fixed 'source) (get-absolute-pathname source))) + (setf (slot-value fixed 'source-type) (car source)) + (setf (slot-value fixed 'source) (get-absolute-pathname source))) (let ((transforms (when (slot-boundp fixed 'transforms) (slot-value fixed 'transforms))) @@ -43,50 +46,44 @@ list as its only parameter. Returns how many rows where read and processed." - (let* ((filespec (source fixed)) - (filename (if (consp filespec) (car filespec) filespec))) - (with-open-file - ;; we just ignore files that don't exist - (input filename - :direction :input - :external-format (encoding fixed) - :if-does-not-exist nil) - (when input - ;; first go to given inline position when filename is a consp - (when (consp filespec) - (loop repeat (cdr filespec) do (read-char input))) + (let ((filenames (case (source-type fixed) + (:inline (list (car (source fixed)))) + (:regex (source fixed)) + (t (list (source fixed)))))) + (loop for filename in filenames + do + (with-open-file + ;; we just ignore files that don't exist + (input filename + :direction :input + :external-format (encoding fixed) + :if-does-not-exist nil) + (when input + ;; first go to given inline position when filename is :inline + (when (eq (source-type fixed) :inline) + (file-position input (cdr (source fixed)))) - ;; ignore as much as skip-lines lines in the file - (loop repeat (skip-lines fixed) do (read-line input nil nil)) + ;; ignore as much as skip-lines lines in the file + (loop repeat (skip-lines fixed) do (read-line input nil nil)) - ;; read in the text file, split it into columns, process NULL columns - ;; the way postmodern expects them, and call PROCESS-ROW-FN on them - (let* ((read 0) - (projection (project-fields :fields (fields fixed) - :columns (columns fixed))) - (reformat-then-process - (lambda (row) - (let ((projected-row - (handler-case - (funcall projection row) - (condition (e) - (pgstate-incf *state* (target fixed) :errs 1) - (log-message :error - "Could not read line ~d: ~a" read e))))) - (when projected-row - (funcall process-row-fn projected-row)))))) - (loop - with fun = (compile nil reformat-then-process) - for line = (read-line input nil nil) - counting line into read - while line - do (funcall fun (parse-row fixed line)) - finally (return read))))))) + ;; read in the text file, split it into columns, process NULL + ;; columns the way postmodern expects them, and call + ;; PROCESS-ROW-FN on them + (let ((reformat-then-process + (reformat-then-process :fields (fields fixed) + :columns (columns fixed) + :target (target fixed) + :process-row-fn process-row-fn))) + (loop + with fun = (compile nil reformat-then-process) + for line = (read-line input nil nil) + counting line into read + while line + do (funcall fun (parse-row fixed line))))))))) (defmethod copy-to-queue ((fixed copy-fixed) dataq) "Copy data from given FIXED definition into lparallel.queue DATAQ" - (let ((read (pgloader.queue:map-push-queue dataq #'map-rows fixed))) - (pgstate-incf *state* (target fixed) :read read))) + (pgloader.queue:map-push-queue dataq #'map-rows fixed)) (defmethod copy-from ((fixed copy-fixed) &key truncate) "Copy data from given FIXED file definition into its PostgreSQL target table." diff --git a/src/sources/sources.lisp b/src/sources/sources.lisp index acb2aa8..f03fc6b 100644 --- a/src/sources/sources.lisp +++ b/src/sources/sources.lisp @@ -141,6 +141,24 @@ :name table-name :type "csv")) +(defun filter-directory (regex + &key + (keep :first) ; or :all + (root *csv-path-root*)) + "Walk the ROOT directory and KEEP either the :first or :all the matches + against the given regexp." + (let* ((candidates (pgloader.archive:get-matching-filenames root regex)) + (candidates (ecase keep + (:first (list (first candidates))) + (:all candidates)))) + (unless candidates + (error "No file matching '~a' in expanded archive in '~a'" regex root)) + + (loop for candidate in candidates + do (if (probe-file candidate) candidate + (error "File does not exists: '~a'." candidate)) + finally (return candidates)))) + (defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*)) "PATHNAME-OR-REGEX is expected to be either (:regexp expression) or (:filename pathname). In the first case, this fonction check if the @@ -150,18 +168,12 @@ In the second case, walk the ROOT directory and return the first pathname that matches the regex. TODO: consider signaling a condition when we have more than one match." - (destructuring-bind (type part) pathname-or-regex + (destructuring-bind (type &rest part) pathname-or-regex (ecase type - (:inline part) + (:inline (car part)) ; because of &rest (:stdin *standard-input*) - (:regex (let* ((candidates - (pgloader.archive:get-matching-filenames root part)) - (candidate (first candidates))) - (unless candidates - (error "No file matching '~a' in expanded archive in '~a'" - part root)) - (if (probe-file candidate) candidate - (error "File does not exists: '~a'." candidate)))) + (:regex (destructuring-bind (keep regex) part + (filter-directory regex :keep keep :root root))) (:filename (let ((filename (if (fad:pathname-absolute-p part) part (merge-pathnames part root)))) @@ -252,3 +264,23 @@ (list ,@newrow)))))))) ;; allow for some debugging (if compile (compile nil projection) projection)))) + +(defun reformat-then-process (&key fields columns target process-row-fn) + "Return a lambda form to apply to each row we read. + + The lambda closes over the READ paramater, which is a counter of how many + lines we did read in the file." + (let ((projection (project-fields :fields fields :columns columns))) + (lambda (row) + (pgstate-incf *state* target :read 1) + (let ((projected-row + (handler-case + (funcall projection row) + (condition (e) + (pgstate-incf *state* target :errs 1) + (log-message :error "Could not read line ~d: ~a" + (pgloader.utils::pgtable-read + (pgstate-get-table *state* target)) + e))))) + (when projected-row + (funcall process-row-fn projected-row))))))