mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 18:36:12 +02:00
Implement support for matching several files in a single archive clause.
This commit is contained in:
parent
98a7eb12a5
commit
2019b918f0
@ -119,7 +119,8 @@
|
||||
#:filter-column-list
|
||||
#:get-pathname
|
||||
#:get-absolute-pathname
|
||||
#:project-fields))
|
||||
#:project-fields
|
||||
#:reformat-then-process))
|
||||
|
||||
(defpackage #:pgloader.csv
|
||||
(:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.sources)
|
||||
|
||||
@ -143,7 +143,10 @@
|
||||
(def-keyword-rule "and")
|
||||
(def-keyword-rule "do")
|
||||
(def-keyword-rule "filename")
|
||||
(def-keyword-rule "matching"))
|
||||
(def-keyword-rule "filenames")
|
||||
(def-keyword-rule "matching")
|
||||
(def-keyword-rule "first")
|
||||
(def-keyword-rule "all"))
|
||||
|
||||
(defrule kw-auto-increment (and "auto_increment" (* (or #\Tab #\Space)))
|
||||
(:constant :auto-increment))
|
||||
@ -1509,11 +1512,25 @@ load database
|
||||
encoding)
|
||||
:utf-8)))
|
||||
|
||||
(defrule filename-matching (and kw-filename kw-matching quoted-regex)
|
||||
(defrule first-filename-matching
|
||||
(and (? kw-first) kw-filename kw-matching quoted-regex)
|
||||
(:lambda (fm)
|
||||
(destructuring-bind (filename matching regex) fm
|
||||
(declare (ignore filename matching))
|
||||
regex)))
|
||||
(destructuring-bind (first filename matching regex) fm
|
||||
(declare (ignore first filename matching))
|
||||
;; regex is a list with first the symbol :regex and second the regexp
|
||||
;; as a string
|
||||
(list* :regex :first (cdr regex)))))
|
||||
|
||||
(defrule all-filename-matching
|
||||
(and kw-all (or kw-filenames kw-filename) kw-matching quoted-regex)
|
||||
(:lambda (fm)
|
||||
(destructuring-bind (all filename matching regex) fm
|
||||
(declare (ignore all filename matching))
|
||||
;; regex is a list with first the symbol :regex and second the regexp
|
||||
;; as a string
|
||||
(list* :regex :all (cdr regex)))))
|
||||
|
||||
(defrule filename-matching (or first-filename-matching all-filename-matching))
|
||||
|
||||
(defrule csv-file-source (or stdin
|
||||
inline
|
||||
@ -1525,8 +1542,8 @@ load database
|
||||
(destructuring-bind (load csv from source) src
|
||||
(declare (ignore load csv from))
|
||||
;; source is (:filename #P"pathname/here")
|
||||
(destructuring-bind (type uri) source
|
||||
(declare (ignore uri))
|
||||
(destructuring-bind (type &rest data) source
|
||||
(declare (ignore data))
|
||||
(ecase type
|
||||
(:stdin source)
|
||||
(:inline source)
|
||||
@ -1668,8 +1685,8 @@ load database
|
||||
(destructuring-bind (load fixed from source) src
|
||||
(declare (ignore load fixed from))
|
||||
;; source is (:filename #P"pathname/here")
|
||||
(destructuring-bind (type uri) source
|
||||
(declare (ignore uri))
|
||||
(destructuring-bind (type &rest data) source
|
||||
(declare (ignore data))
|
||||
(ecase type
|
||||
(:stdin source)
|
||||
(:inline source)
|
||||
@ -1752,12 +1769,14 @@ load database
|
||||
source)))
|
||||
|
||||
(defrule load-archive (and archive-source
|
||||
target
|
||||
(? target)
|
||||
(? before-load-do)
|
||||
archive-command-list
|
||||
(? finally-do))
|
||||
(:lambda (archive)
|
||||
(destructuring-bind (source pg-db-uri before commands finally) archive
|
||||
(when (and (or before finally) (null pg-db-uri))
|
||||
(error "When using a BEFORE LOAD DO or a FINALLY block, you must provide an archive level target database connection."))
|
||||
(destructuring-bind (&key host port user password dbname &allow-other-keys)
|
||||
pg-db-uri
|
||||
`(lambda ()
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
;;; Implementing the pgloader source API
|
||||
;;;
|
||||
(defclass copy-csv (copy)
|
||||
((encoding :accessor encoding ; file encoding
|
||||
((source-type :accessor source-type ; one of :inline, :stdin, :regex
|
||||
:initarg :source-type) ; or :filename
|
||||
(encoding :accessor encoding ; file encoding
|
||||
:initarg :encoding) ;
|
||||
(skip-lines :accessor skip-lines ; CSV headers
|
||||
:initarg :skip-lines ;
|
||||
@ -31,7 +33,8 @@
|
||||
"Compute the real source definition from the given source parameter, and
|
||||
set the transforms function list as needed too."
|
||||
(let ((source (slot-value csv 'source)))
|
||||
(setf (slot-value csv 'source) (get-absolute-pathname source)))
|
||||
(setf (slot-value csv 'source-type) (car source))
|
||||
(setf (slot-value csv 'source) (get-absolute-pathname source)))
|
||||
|
||||
(let ((transforms (when (slot-boundp csv 'transforms)
|
||||
(slot-value csv 'transforms)))
|
||||
@ -58,59 +61,52 @@
|
||||
data. That's used to handle the INLINE data loading.
|
||||
|
||||
Finally returns how many rows where read and processed."
|
||||
(let* ((filespec (source csv))
|
||||
(filename (if (consp filespec) (car filespec) filespec)))
|
||||
(with-open-file
|
||||
;; we just ignore files that don't exist
|
||||
(input filename
|
||||
:direction :input
|
||||
:external-format (encoding csv)
|
||||
:if-does-not-exist nil)
|
||||
(when input
|
||||
;; first go to given inline position when filename is a consp
|
||||
(when (consp filespec)
|
||||
(loop repeat (cdr filespec) do (read-char input)))
|
||||
(let ((filenames (case (source-type csv)
|
||||
(:inline (list (car (source csv))))
|
||||
(:regex (source csv))
|
||||
(t (list (source csv))))))
|
||||
(loop for filename in filenames
|
||||
do
|
||||
(with-open-file
|
||||
;; we just ignore files that don't exist
|
||||
(input filename
|
||||
:direction :input
|
||||
:external-format (encoding csv)
|
||||
:if-does-not-exist nil)
|
||||
(when input
|
||||
(log-message :info "COPY FROM ~s" filename)
|
||||
|
||||
;; we handle skipping more than one line here, as cl-csv only knows
|
||||
;; about skipping the first line
|
||||
(loop repeat (skip-lines csv) do (read-line input nil nil))
|
||||
;; first go to given inline position when filename is :inline
|
||||
(when (eq (source-type csv) :inline)
|
||||
(file-position input (cdr (source csv))))
|
||||
|
||||
;; read in the text file, split it into columns, process NULL columns
|
||||
;; the way postmodern expects them, and call PROCESS-ROW-FN on them
|
||||
(let* ((read 0)
|
||||
(projection (project-fields :fields (fields csv)
|
||||
:columns (columns csv)))
|
||||
(reformat-then-process
|
||||
(lambda (row)
|
||||
(incf read)
|
||||
(let ((projected-row
|
||||
(handler-case
|
||||
(funcall projection row)
|
||||
(condition (e)
|
||||
(pgstate-incf *state* (target csv) :errs 1)
|
||||
(log-message :error
|
||||
"Could not read line ~d: ~a" read e)))))
|
||||
(when projected-row
|
||||
(funcall process-row-fn projected-row))))))
|
||||
;; we handle skipping more than one line here, as cl-csv only knows
|
||||
;; about skipping the first line
|
||||
(loop repeat (skip-lines csv) do (read-line input nil nil))
|
||||
|
||||
(handler-case
|
||||
(cl-csv:read-csv input
|
||||
:row-fn (compile nil reformat-then-process)
|
||||
:separator (csv-separator csv)
|
||||
:quote (csv-quote csv)
|
||||
:escape (csv-escape csv)
|
||||
:trim-blanks (csv-trim-blanks csv))
|
||||
((or cl-csv:csv-parse-error type-error) (condition)
|
||||
(progn
|
||||
(log-message :error "~a" condition)
|
||||
(pgstate-setf *state* (target csv) :errs -1))))
|
||||
;; return how many rows we did read
|
||||
read)))))
|
||||
;; read in the text file, split it into columns, process NULL
|
||||
;; columns the way postmodern expects them, and call
|
||||
;; PROCESS-ROW-FN on them
|
||||
(let ((reformat-then-process
|
||||
(reformat-then-process :fields (fields csv)
|
||||
:columns (columns csv)
|
||||
:target (target csv)
|
||||
:process-row-fn process-row-fn)))
|
||||
(handler-case
|
||||
(cl-csv:read-csv input
|
||||
:row-fn (compile nil reformat-then-process)
|
||||
:separator (csv-separator csv)
|
||||
:quote (csv-quote csv)
|
||||
:escape (csv-escape csv)
|
||||
:trim-blanks (csv-trim-blanks csv))
|
||||
((or cl-csv:csv-parse-error type-error) (condition)
|
||||
(progn
|
||||
(log-message :error "~a" condition)
|
||||
(pgstate-setf *state* (target csv) :errs -1))))))))))
|
||||
|
||||
(defmethod copy-to-queue ((csv copy-csv) dataq)
|
||||
"Copy data from given CSV definition into lparallel.queue DATAQ"
|
||||
(let ((read (pgloader.queue:map-push-queue dataq #'map-rows csv)))
|
||||
(pgstate-incf *state* (target csv) :read read)))
|
||||
(pgloader.queue:map-push-queue dataq #'map-rows csv))
|
||||
|
||||
(defmethod copy-from ((csv copy-csv) &key truncate)
|
||||
"Copy data from given CSV file definition into its PostgreSQL target table."
|
||||
|
||||
@ -5,7 +5,9 @@
|
||||
(in-package :pgloader.fixed)
|
||||
|
||||
(defclass copy-fixed (copy)
|
||||
((encoding :accessor encoding ; file encoding
|
||||
((source-type :accessor source-type ; one of :inline, :stdin, :regex
|
||||
:initarg :source-type) ; or :filename
|
||||
(encoding :accessor encoding ; file encoding
|
||||
:initarg :encoding) ;
|
||||
(skip-lines :accessor skip-lines ; CSV headers
|
||||
:initarg :skip-lines ;
|
||||
@ -16,7 +18,8 @@
|
||||
"Compute the real source definition from the given source parameter, and
|
||||
set the transforms function list as needed too."
|
||||
(let ((source (slot-value fixed 'source)))
|
||||
(setf (slot-value fixed 'source) (get-absolute-pathname source)))
|
||||
(setf (slot-value fixed 'source-type) (car source))
|
||||
(setf (slot-value fixed 'source) (get-absolute-pathname source)))
|
||||
|
||||
(let ((transforms (when (slot-boundp fixed 'transforms)
|
||||
(slot-value fixed 'transforms)))
|
||||
@ -43,50 +46,44 @@
|
||||
list as its only parameter.
|
||||
|
||||
Returns how many rows where read and processed."
|
||||
(let* ((filespec (source fixed))
|
||||
(filename (if (consp filespec) (car filespec) filespec)))
|
||||
(with-open-file
|
||||
;; we just ignore files that don't exist
|
||||
(input filename
|
||||
:direction :input
|
||||
:external-format (encoding fixed)
|
||||
:if-does-not-exist nil)
|
||||
(when input
|
||||
;; first go to given inline position when filename is a consp
|
||||
(when (consp filespec)
|
||||
(loop repeat (cdr filespec) do (read-char input)))
|
||||
(let ((filenames (case (source-type fixed)
|
||||
(:inline (list (car (source fixed))))
|
||||
(:regex (source fixed))
|
||||
(t (list (source fixed))))))
|
||||
(loop for filename in filenames
|
||||
do
|
||||
(with-open-file
|
||||
;; we just ignore files that don't exist
|
||||
(input filename
|
||||
:direction :input
|
||||
:external-format (encoding fixed)
|
||||
:if-does-not-exist nil)
|
||||
(when input
|
||||
;; first go to given inline position when filename is :inline
|
||||
(when (eq (source-type fixed) :inline)
|
||||
(file-position input (cdr (source fixed))))
|
||||
|
||||
;; ignore as much as skip-lines lines in the file
|
||||
(loop repeat (skip-lines fixed) do (read-line input nil nil))
|
||||
;; ignore as much as skip-lines lines in the file
|
||||
(loop repeat (skip-lines fixed) do (read-line input nil nil))
|
||||
|
||||
;; read in the text file, split it into columns, process NULL columns
|
||||
;; the way postmodern expects them, and call PROCESS-ROW-FN on them
|
||||
(let* ((read 0)
|
||||
(projection (project-fields :fields (fields fixed)
|
||||
:columns (columns fixed)))
|
||||
(reformat-then-process
|
||||
(lambda (row)
|
||||
(let ((projected-row
|
||||
(handler-case
|
||||
(funcall projection row)
|
||||
(condition (e)
|
||||
(pgstate-incf *state* (target fixed) :errs 1)
|
||||
(log-message :error
|
||||
"Could not read line ~d: ~a" read e)))))
|
||||
(when projected-row
|
||||
(funcall process-row-fn projected-row))))))
|
||||
(loop
|
||||
with fun = (compile nil reformat-then-process)
|
||||
for line = (read-line input nil nil)
|
||||
counting line into read
|
||||
while line
|
||||
do (funcall fun (parse-row fixed line))
|
||||
finally (return read)))))))
|
||||
;; read in the text file, split it into columns, process NULL
|
||||
;; columns the way postmodern expects them, and call
|
||||
;; PROCESS-ROW-FN on them
|
||||
(let ((reformat-then-process
|
||||
(reformat-then-process :fields (fields fixed)
|
||||
:columns (columns fixed)
|
||||
:target (target fixed)
|
||||
:process-row-fn process-row-fn)))
|
||||
(loop
|
||||
with fun = (compile nil reformat-then-process)
|
||||
for line = (read-line input nil nil)
|
||||
counting line into read
|
||||
while line
|
||||
do (funcall fun (parse-row fixed line)))))))))
|
||||
|
||||
(defmethod copy-to-queue ((fixed copy-fixed) dataq)
|
||||
"Copy data from given FIXED definition into lparallel.queue DATAQ"
|
||||
(let ((read (pgloader.queue:map-push-queue dataq #'map-rows fixed)))
|
||||
(pgstate-incf *state* (target fixed) :read read)))
|
||||
(pgloader.queue:map-push-queue dataq #'map-rows fixed))
|
||||
|
||||
(defmethod copy-from ((fixed copy-fixed) &key truncate)
|
||||
"Copy data from given FIXED file definition into its PostgreSQL target table."
|
||||
|
||||
@ -141,6 +141,24 @@
|
||||
:name table-name
|
||||
:type "csv"))
|
||||
|
||||
(defun filter-directory (regex
|
||||
&key
|
||||
(keep :first) ; or :all
|
||||
(root *csv-path-root*))
|
||||
"Walk the ROOT directory and KEEP either the :first or :all the matches
|
||||
against the given regexp."
|
||||
(let* ((candidates (pgloader.archive:get-matching-filenames root regex))
|
||||
(candidates (ecase keep
|
||||
(:first (list (first candidates)))
|
||||
(:all candidates))))
|
||||
(unless candidates
|
||||
(error "No file matching '~a' in expanded archive in '~a'" regex root))
|
||||
|
||||
(loop for candidate in candidates
|
||||
do (if (probe-file candidate) candidate
|
||||
(error "File does not exists: '~a'." candidate))
|
||||
finally (return candidates))))
|
||||
|
||||
(defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*))
|
||||
"PATHNAME-OR-REGEX is expected to be either (:regexp expression)
|
||||
or (:filename pathname). In the first case, this fonction check if the
|
||||
@ -150,18 +168,12 @@
|
||||
In the second case, walk the ROOT directory and return the first pathname
|
||||
that matches the regex. TODO: consider signaling a condition when we have
|
||||
more than one match."
|
||||
(destructuring-bind (type part) pathname-or-regex
|
||||
(destructuring-bind (type &rest part) pathname-or-regex
|
||||
(ecase type
|
||||
(:inline part)
|
||||
(:inline (car part)) ; because of &rest
|
||||
(:stdin *standard-input*)
|
||||
(:regex (let* ((candidates
|
||||
(pgloader.archive:get-matching-filenames root part))
|
||||
(candidate (first candidates)))
|
||||
(unless candidates
|
||||
(error "No file matching '~a' in expanded archive in '~a'"
|
||||
part root))
|
||||
(if (probe-file candidate) candidate
|
||||
(error "File does not exists: '~a'." candidate))))
|
||||
(:regex (destructuring-bind (keep regex) part
|
||||
(filter-directory regex :keep keep :root root)))
|
||||
(:filename (let ((filename
|
||||
(if (fad:pathname-absolute-p part) part
|
||||
(merge-pathnames part root))))
|
||||
@ -252,3 +264,23 @@
|
||||
(list ,@newrow))))))))
|
||||
;; allow for some debugging
|
||||
(if compile (compile nil projection) projection))))
|
||||
|
||||
(defun reformat-then-process (&key fields columns target process-row-fn)
|
||||
"Return a lambda form to apply to each row we read.
|
||||
|
||||
The lambda closes over the READ paramater, which is a counter of how many
|
||||
lines we did read in the file."
|
||||
(let ((projection (project-fields :fields fields :columns columns)))
|
||||
(lambda (row)
|
||||
(pgstate-incf *state* target :read 1)
|
||||
(let ((projected-row
|
||||
(handler-case
|
||||
(funcall projection row)
|
||||
(condition (e)
|
||||
(pgstate-incf *state* target :errs 1)
|
||||
(log-message :error "Could not read line ~d: ~a"
|
||||
(pgloader.utils::pgtable-read
|
||||
(pgstate-get-table *state* target))
|
||||
e)))))
|
||||
(when projected-row
|
||||
(funcall process-row-fn projected-row))))))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user