diff --git a/src/connection.lisp b/src/connection.lisp index 2b3839c..3ee2c21 100644 --- a/src/connection.lisp +++ b/src/connection.lisp @@ -121,7 +121,12 @@ :type (conn-type ,conn) :host (db-host ,conn) :port (db-port ,conn) - :user (db-user ,conn)))))))) + :user (db-user ,conn))) + + (t + (error 'connection-error + :mesg (format nil "~a" e) + :type (conn-type ,conn)))))))) (unwind-protect (progn ,@forms) (close-connection ,var))))) diff --git a/src/package.lisp b/src/package.lisp index 064d162..a434281 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -124,6 +124,13 @@ #:query #:check-connection + ;; also export slot names + #:type + #:handle + #:uri + #:arch + #:path + ;; file based connections API for HTTP and Archives support #:fetch-file #:expand @@ -159,6 +166,13 @@ #:copy-to #:copy-database + ;; the md-connection facilities + #:md-connection + #:md-spec + #:md-strm + #:expand-spec + #:open-next-stream + ;; common schema facilities #:push-to-end @@ -264,8 +278,7 @@ (:import-from #:pgloader.pgsql #:maybe-drop-indexes #:create-indexes-again) - (:export #:*csv-path-root* - #:csv-connection + (:export #:csv-connection #:specs #:csv-specs #:get-pathname @@ -471,6 +484,9 @@ (:use #:cl #:esrap #:metabang.bind #:pgloader.params #:pgloader.utils #:pgloader.sql #:pgloader.connection) (:import-from #:alexandria #:read-file-into-string) + (:import-from #:pgloader.sources + #:md-connection + #:md-spec) (:import-from #:pgloader.pgsql #:pgsql-connection #:with-pgsql-transaction @@ -554,7 +570,7 @@ #:connection-error) (:export #:*version-string* #:*state* - #:*csv-path-root* + #:*fd-path-root* #:*root-dir* #:*pg-settings* diff --git a/src/params.lisp b/src/params.lisp index 50581d0..26ad3aa 100644 --- a/src/params.lisp +++ b/src/params.lisp @@ -8,7 +8,7 @@ (:export #:*version-string* #:*dry-run* #:*self-upgrade-immutable-systems* - #:*csv-path-root* + #:*fd-path-root* #:*root-dir* #:*log-filename* #:*summary-pathname* @@ -73,8 +73,8 @@ (defparameter *state* nil "State of the current loading.") -(defparameter *csv-path-root* nil - "Where to load CSV files from, when loading from an archive.") +(defparameter *fd-path-root* nil + "Where to load files from, when loading from an archive or expanding regexps.") (defparameter *root-dir* #+unix (make-pathname :directory "/tmp/pgloader/") diff --git a/src/parsers/command-archive.lisp b/src/parsers/command-archive.lisp index 9bd63eb..b0326f8 100644 --- a/src/parsers/command-archive.lisp +++ b/src/parsers/command-archive.lisp @@ -53,7 +53,7 @@ ("download" :state state-before) (pgloader.archive:http-fetch-file ,url))) (:filename url)))) - (*csv-path-root* + (*fd-path-root* (with-stats-collection ("extract" :state state-before) (pgloader.archive:expand-archive archive-file)))) (progn diff --git a/src/parsers/command-copy.lisp b/src/parsers/command-copy.lisp index 7bff140..672fa85 100644 --- a/src/parsers/command-copy.lisp +++ b/src/parsers/command-copy.lisp @@ -60,7 +60,7 @@ (defrule copy-uri (and "copy://" filename) (:lambda (source) (bind (((_ filename) source)) - (make-instance 'copy-connection :specs filename)))) + (make-instance 'copy-connection :spec filename)))) (defrule copy-file-source (or stdin inline @@ -72,10 +72,10 @@ (if (typep src 'copy-connection) src (destructuring-bind (type &rest specs) src (case type - (:stdin (make-instance 'copy-connection :specs src)) - (:inline (make-instance 'copy-connection :specs src)) - (:filename (make-instance 'copy-connection :specs src)) - (:regex (make-instance 'copy-connection :specs src)) + (:stdin (make-instance 'copy-connection :spec src)) + (:inline (make-instance 'copy-connection :spec src)) + (:filename (make-instance 'copy-connection :spec src)) + (:regex (make-instance 'copy-connection :spec src)) (:http (make-instance 'copy-connection :uri (first specs)))))))) (defrule get-copy-file-source-from-environment-variable (and kw-getenv name) diff --git a/src/parsers/command-csv.lisp b/src/parsers/command-csv.lisp index 8ea5cef..e7046bd 100644 --- a/src/parsers/command-csv.lisp +++ b/src/parsers/command-csv.lisp @@ -377,7 +377,7 @@ (defrule csv-uri (and "csv://" filename) (:lambda (source) (bind (((_ filename) source)) - (make-instance 'csv-connection :specs filename)))) + (make-instance 'csv-connection :spec filename)))) (defrule csv-file-source (or stdin inline @@ -389,10 +389,10 @@ (if (typep src 'csv-connection) src (destructuring-bind (type &rest specs) src (case type - (:stdin (make-instance 'csv-connection :specs src)) - (:inline (make-instance 'csv-connection :specs src)) - (:filename (make-instance 'csv-connection :specs src)) - (:regex (make-instance 'csv-connection :specs src)) + (:stdin (make-instance 'csv-connection :spec src)) + (:inline (make-instance 'csv-connection :spec src)) + (:filename (make-instance 'csv-connection :spec src)) + (:regex (make-instance 'csv-connection :spec src)) (:http (make-instance 'csv-connection :uri (first specs)))))))) (defrule get-csv-file-source-from-environment-variable (and kw-getenv name) diff --git a/src/parsers/command-fixed.lisp b/src/parsers/command-fixed.lisp index 18c8717..0485535 100644 --- a/src/parsers/command-fixed.lisp +++ b/src/parsers/command-fixed.lisp @@ -68,7 +68,7 @@ (defrule fixed-uri (and "fixed://" filename) (:lambda (source) (bind (((_ filename) source)) - (make-instance 'fixed-connection :specs filename)))) + (make-instance 'fixed-connection :spec filename)))) (defrule fixed-file-source (or stdin inline @@ -80,10 +80,10 @@ (if (typep src 'fixed-connection) src (destructuring-bind (type &rest specs) src (case type - (:stdin (make-instance 'fixed-connection :specs src)) - (:inline (make-instance 'fixed-connection :specs src)) - (:filename (make-instance 'fixed-connection :specs src)) - (:regex (make-instance 'fixed-connection :specs src)) + (:stdin (make-instance 'fixed-connection :spec src)) + (:inline (make-instance 'fixed-connection :spec src)) + (:filename (make-instance 'fixed-connection :spec src)) + (:regex (make-instance 'fixed-connection :spec src)) (:http (make-instance 'fixed-connection :uri (first specs)))))))) (defrule get-fixed-file-source-from-environment-variable (and kw-getenv name) diff --git a/src/parsers/command-parser.lisp b/src/parsers/command-parser.lisp index 6bdee0e..e4c3f59 100644 --- a/src/parsers/command-parser.lisp +++ b/src/parsers/command-parser.lisp @@ -37,11 +37,10 @@ (loop :for s-exp :in command - :when (and (or (typep s-exp 'csv-connection) - (typep s-exp 'fixed-connection)) - (slot-boundp s-exp 'specs) - (eq :inline (first (csv-specs s-exp)))) - :do (setf (second (csv-specs s-exp)) position) + :when (and (typep s-exp 'md-connection) + (slot-boundp s-exp 'pgloader.sources::spec) + (eq :inline (first (md-spec s-exp)))) + :do (setf (second (md-spec s-exp)) position) :and :collect s-exp :else :collect (if (and (consp s-exp) (listp (cdr s-exp))) @@ -67,13 +66,12 @@ filename))) s-exp) - ((and (or (typep s-exp 'csv-connection) - (typep s-exp 'fixed-connection)) - (slot-boundp s-exp 'specs) - (eq :filename (car (csv-specs s-exp)))) - (let ((path (second (csv-specs s-exp)))) + ((and (typep s-exp 'md-connection) + (slot-boundp s-exp 'pgloader.sources::spec) + (eq :filename (car (md-spec s-exp)))) + (let ((path (second (md-spec s-exp)))) (if (uiop:relative-pathname-p path) - (progn (setf (csv-specs s-exp) + (progn (setf (md-spec s-exp) `(:filename ,(uiop:merge-pathnames* path filename))) diff --git a/src/sources/common/files-and-pathnames.lisp b/src/sources/common/files-and-pathnames.lisp index e00f652..d01d515 100644 --- a/src/sources/common/files-and-pathnames.lisp +++ b/src/sources/common/files-and-pathnames.lisp @@ -3,31 +3,109 @@ ;;; (in-package #:pgloader.sources) -(defmacro with-open-file-or-stream ((&whole arguments - stream filename-or-stream - &key &allow-other-keys) - &body body) - "Generate a with-open-file call, or just bind STREAM varialbe to the - FILENAME-OR-STREAM stream when this variable is of type STREAM." - `(typecase ,filename-or-stream - (stream (let ((,stream *standard-input*)) - ,@body)) +(defclass md-connection (fd-connection) + ((spec :initarg :spec :accessor md-spec) + (strm :initarg :strm :accessor md-strm)) + (:documentation "pgloader connection parameters for a multi-files source.")) - (t (with-open-file (,stream ,filename-or-stream ,@(cddr arguments)) - ,@body)))) +(defmethod print-object ((c md-connection) stream) + (print-unreadable-object (c stream :type t :identity t) + (with-slots (type spec) c + (let ((path (when (slot-boundp c 'path) (slot-value c 'path)))) + (format stream "~a://~a:~{~a~^,~}" type (first spec) path))))) -(defun get-pathname (dbname table-name &key (csv-path-root *csv-path-root*)) +(defmethod expand :after ((md md-connection)) + "Expand the archive for the MD connection." + (when (and (slot-boundp md 'pgloader.connection::path) + (slot-value md 'pgloader.connection::path) + (uiop:file-pathname-p (fd-path md))) + (setf (md-spec md) `(:filename ,(fd-path md))))) + +(defmethod fetch-file :after ((md md-connection)) + "When the fd-connection has an URI slot, download its file." + (when (and (slot-boundp md 'pgloader.connection::path) + (slot-value md 'pgloader.connection::path)) + (setf (md-spec md) `(:filename ,(fd-path md))))) + +(defgeneric expand-spec (md-connection) + (:documentation "Expand specification for an FD source.")) + +(defgeneric open-next-stream (md-connection &rest args &key) + (:documentation "Open the next input stream from FD.")) + +(defmethod expand-spec ((md md-connection)) + "Given fd spec as a CONS of a source type and a tagged object, expand the + tagged object depending on the source type and return a list of pathnames." + (destructuring-bind (type &rest part) (md-spec md) + (ecase type + (:inline (list (caar part))) + (:stdin (list *standard-input*)) + (:regex (destructuring-bind (keep regex root) part + (filter-directory regex + :keep keep + :root (or *fd-path-root* root)))) + (:filename (let* ((filename (first part)) + (realname + (if (fad:pathname-absolute-p filename) filename + (merge-pathnames filename *fd-path-root*)))) + (if (probe-file realname) (list realname) + (error "File does not exists: '~a'." realname))))))) + +(defmethod open-next-stream ((md md-connection) + &rest args + &key &allow-other-keys) + "Switch to the following file in the PATH list." + ;; first close current stream + (when (slot-boundp md 'strm) + (close (md-strm md))) + + ;; now open the new one + (when (fd-path md) + (let ((current-path (pop (fd-path md)))) + (when current-path + (log-message :info "Open ~s" current-path) + (prog1 + (setf (md-strm md) + (typecase current-path + (stream current-path) + (t (apply #'open current-path args)))) + ;; + ;; The :inline md spec is a little special, it's a filename and a + ;; position where to skip to at opening the file. It allows for + ;; easier self-contained tests. + ;; + (when (eq :inline (car (md-spec md))) + (file-position (md-strm md) (cdadr (md-spec md))))))))) + +(defmethod open-connection ((md md-connection) &key) + "The fd connection supports several specs to open a connection: + - if we have a path, that's a single file to open + - otherwise spec is an CONS wherin + - car is the source type + - cdr is an object suitable for `get-absolute-pathname`" + (setf (fd-path md) (expand-spec md)) + md) + +(defmethod close-connection ((md md-connection)) + "Reset." + (when (and (slot-boundp md 'strm) (md-strm md)) + (close (md-strm md))) + + (setf (md-strm md) nil + (fd-path md) nil)) + +(defun get-pathname (dbname table-name &key (fd-path-root *fd-path-root*)) "Return a pathname where to read or write the file data" (make-pathname :directory (pathname-directory - (merge-pathnames (format nil "~a/" dbname) csv-path-root)) + (merge-pathnames (format nil "~a/" dbname) fd-path-root)) :name table-name :type "csv")) (defun filter-directory (regex &key (keep :first) ; or :all - (root *csv-path-root*)) + (root *fd-path-root*)) "Walk the ROOT directory and KEEP either the :first or :all the matches against the given regexp." (let* ((candidates (pgloader.archive:get-matching-filenames root regex)) @@ -42,26 +120,3 @@ (error "File does not exists: '~a'." candidate)) finally (return candidates)))) -(defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*)) - "PATHNAME-OR-REGEX is expected to be either (:regexp expression) - or (:filename pathname). In the first case, this fonction check if the - pathname is absolute or relative and returns an absolute pathname given - current working directory of ROOT. - - In the second case, walk the ROOT directory and return the first pathname - that matches the regex. TODO: consider signaling a condition when we have - more than one match." - (destructuring-bind (type &rest part) pathname-or-regex - (ecase type - (:inline (car part)) ; because of &rest - (:stdin *standard-input*) - (:regex (destructuring-bind (keep regex root) part - (filter-directory regex - :keep keep - :root (or *csv-path-root* root)))) - (:filename (let* ((filename (first part)) - (realname - (if (fad:pathname-absolute-p filename) filename - (merge-pathnames filename root)))) - (if (probe-file realname) realname - (error "File does not exists: '~a'." realname))))))) diff --git a/src/sources/copy.lisp b/src/sources/copy.lisp index 1807f1e..2c4c56c 100644 --- a/src/sources/copy.lisp +++ b/src/sources/copy.lisp @@ -3,16 +3,14 @@ ;;; (in-package :pgloader.copy) -(defclass copy-connection (csv-connection) ()) +(defclass copy-connection (md-connection) ()) -(defmethod initialize-instance :after ((csvconn copy-connection) &key) +(defmethod initialize-instance :after ((copy copy-connection) &key) "Assign the type slot to sqlite." - (setf (slot-value csvconn 'type) "copy")) + (setf (slot-value copy 'type) "copy")) (defclass copy-copy (copy) - ((source-type :accessor source-type ; one of :inline, :stdin, :regex - :initarg :source-type) ; or :filename - (encoding :accessor encoding ; file encoding + ((encoding :accessor encoding ; file encoding :initarg :encoding) ; (skip-lines :accessor skip-lines ; we might want to skip COPY lines :initarg :skip-lines ; @@ -28,10 +26,6 @@ (defmethod initialize-instance :after ((copy copy-copy) &key) "Compute the real source definition from the given source parameter, and set the transforms function list as needed too." - (let ((source (csv-specs (slot-value copy 'source)))) - (setf (slot-value copy 'source-type) (car source)) - (setf (slot-value copy 'source) (get-absolute-pathname source))) - (let ((transforms (when (slot-boundp copy 'transforms) (slot-value copy 'transforms))) (columns @@ -69,48 +63,37 @@ list as its only parameter. Returns how many rows were read and processed." - (let ((filenames (case (source-type copy) - (:stdin (list (source copy))) - (:inline (list (car (source copy)))) - (:regex (source copy)) - (t (list (source copy)))))) - (loop :for filename :in filenames - :do - (with-open-file-or-stream - ;; we just ignore files that don't exist - (input filename - :direction :input - :external-format (encoding copy) - :if-does-not-exist nil) - (when input - ;; first go to given inline position when filename is :inline - (when (eq (source-type copy) :inline) - (file-position input (cdr (source copy)))) + (with-connection (cnx (source copy)) + (loop :for input := (open-next-stream cnx + :direction :input + :external-format (encoding copy) + :if-does-not-exist nil) + :while input + :do (progn + ;; ignore as much as skip-lines lines in the file + (loop repeat (skip-lines copy) do (read-line input nil nil)) - ;; ignore as much as skip-lines lines in the file - (loop repeat (skip-lines copy) do (read-line input nil nil)) - - ;; read in the text file, split it into columns, process NULL - ;; columns the way postmodern expects them, and call - ;; PROCESS-ROW-FN on them - (let ((reformat-then-process - (reformat-then-process :fields (fields copy) - :columns (columns copy) - :target (target copy) - :process-row-fn process-row-fn))) - (loop - :with fun := reformat-then-process - :for line := (read-line input nil nil) - :counting line :into read - :while line - :do (handler-case + ;; read in the text file, split it into columns, process NULL + ;; columns the way postmodern expects them, and call + ;; PROCESS-ROW-FN on them + (let ((reformat-then-process + (reformat-then-process :fields (fields copy) + :columns (columns copy) + :target (target copy) + :process-row-fn process-row-fn))) + (loop + :with fun := reformat-then-process + :for line := (read-line input nil nil) + :counting line :into read + :while line + :do (handler-case (funcall fun (parse-row line :delimiter (delimiter copy) :null-as (null-as copy))) (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target copy) :errs 1))))))))))) + (pgstate-incf *state* (target copy) :errs 1)))))))))) (defmethod copy-to-queue ((copy copy-copy) queue) "Copy data from given COPY definition into lparallel.queue DATAQ" diff --git a/src/sources/csv/csv-database.lisp b/src/sources/csv/csv-database.lisp index 49b8432..3faef8f 100644 --- a/src/sources/csv/csv-database.lisp +++ b/src/sources/csv/csv-database.lisp @@ -13,7 +13,7 @@ ;;; (defun import-database (dbname &key - (csv-path-root *csv-path-root*) + (fd-path-root *fd-path-root*) (skip-lines 0) (separator #\Tab) (quote cl-csv:*quote*) @@ -26,7 +26,7 @@ (loop for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname) for filename = (get-pathname dbname table-name - :csv-path-root csv-path-root) + :fd-path-root fd-path-root) when (or (null only-tables) (member table-name only-tables :test #'equal)) do diff --git a/src/sources/csv/csv.lisp b/src/sources/csv/csv.lisp index 9006efd..7d77697 100644 --- a/src/sources/csv/csv.lisp +++ b/src/sources/csv/csv.lisp @@ -4,32 +4,11 @@ (in-package :pgloader.csv) -(defclass csv-connection (fd-connection) - ((specs :initarg :specs :accessor csv-specs))) +(defclass csv-connection (md-connection) ()) -(defmethod initialize-instance :after ((csvconn csv-connection) &key) +(defmethod initialize-instance :after ((csv csv-connection) &key) "Assign the type slot to sqlite." - (setf (slot-value csvconn 'type) "csv")) - -(defmethod print-object ((csv csv-connection) stream) - (print-unreadable-object (csv stream :type t :identity t) - (let ((specs (if (slot-boundp csv 'specs) (slot-value csv 'specs) - `(:http ,(slot-value csv 'pgloader.connection::uri))))) - (with-slots (type) csv - (format stream "~a://~a:~a" type (first specs) (second specs)))))) - -(defmethod expand :after ((csv csv-connection)) - "Expand the archive for the FD connection." - (when (and (slot-boundp csv 'pgloader.connection::path) - (slot-value csv 'pgloader.connection::path) - (uiop:file-pathname-p (fd-path csv))) - (setf (csv-specs csv) `(:filename ,(fd-path csv))))) - -(defmethod fetch-file :after ((csv csv-connection)) - "When the fd-connection has an URI slot, download its file." - (when (and (slot-boundp csv 'pgloader.connection::path) - (slot-value csv 'pgloader.connection::path)) - (setf (csv-specs csv) `(:filename ,(fd-path csv))))) + (setf (slot-value csv 'type) "csv")) ;;; ;;; Implementing the pgloader source API @@ -68,10 +47,6 @@ (defmethod initialize-instance :after ((csv copy-csv) &key) "Compute the real source definition from the given source parameter, and set the transforms function list as needed too." - (let ((source (csv-specs (slot-value csv 'source)))) - (setf (slot-value csv 'source-type) (car source)) - (setf (slot-value csv 'source) (get-absolute-pathname source))) - (let ((transforms (when (slot-boundp csv 'transforms) (slot-value csv 'transforms))) (columns @@ -105,32 +80,16 @@ Each row is pre-processed then PROCESS-ROW-FN is called with the row as a list as its only parameter. - FILESPEC is either a filename or a pair (filename . position) where - position is the number of bytes to skip in the file before getting to the - data. That's used to handle the INLINE data loading. - Finally returns how many rows where read and processed." - (let ((filenames (case (source-type csv) - (:stdin (list (source csv))) - (:inline (list (car (source csv)))) - (:regex (source csv)) - (t (list (source csv)))))) - (loop for filename in filenames - do - (with-open-file-or-stream - ;; we just ignore files that don't exist - (input filename - :direction :input - :external-format (encoding csv) - :if-does-not-exist nil) - (when input - (log-message :info "COPY FROM ~s" filename) - ;; first go to given inline position when filename is :inline - (when (eq (source-type csv) :inline) - (file-position input (cdr (source csv)))) - - ;; we handle skipping more than one line here, as cl-csv only knows + (with-connection (cnx (source csv)) + (loop :for input := (open-next-stream cnx + :direction :input + :external-format (encoding csv) + :if-does-not-exist nil) + :while input + :do (progn + ;; we handle skipping more than one line here, as cl-csv only knows ;; about skipping the first line (loop repeat (skip-lines csv) do (read-line input nil nil)) @@ -167,7 +126,7 @@ (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target csv) :errs 1)))))))))) + (pgstate-incf *state* (target csv) :errs 1))))))))) (defmethod copy-to-queue ((csv copy-csv) queue) "Copy data from given CSV definition into lparallel.queue DATAQ" diff --git a/src/sources/fixed.lisp b/src/sources/fixed.lisp index bde1c0f..54620f5 100644 --- a/src/sources/fixed.lisp +++ b/src/sources/fixed.lisp @@ -4,16 +4,14 @@ (in-package :pgloader.fixed) -(defclass fixed-connection (csv-connection) ()) +(defclass fixed-connection (md-connection) ()) -(defmethod initialize-instance :after ((csvconn fixed-connection) &key) +(defmethod initialize-instance :after ((fixed fixed-connection) &key) "Assign the type slot to sqlite." - (setf (slot-value csvconn 'type) "fixed")) + (setf (slot-value fixed 'type) "fixed")) (defclass copy-fixed (copy) - ((source-type :accessor source-type ; one of :inline, :stdin, :regex - :initarg :source-type) ; or :filename - (encoding :accessor encoding ; file encoding + ((encoding :accessor encoding ; file encoding :initarg :encoding) ; (skip-lines :accessor skip-lines ; CSV headers :initarg :skip-lines ; @@ -23,10 +21,6 @@ (defmethod initialize-instance :after ((fixed copy-fixed) &key) "Compute the real source definition from the given source parameter, and set the transforms function list as needed too." - (let ((source (csv-specs (slot-value fixed 'source)))) - (setf (slot-value fixed 'source-type) (car source)) - (setf (slot-value fixed 'source) (get-absolute-pathname source))) - (let ((transforms (when (slot-boundp fixed 'transforms) (slot-value fixed 'transforms))) (columns @@ -57,47 +51,35 @@ list as its only parameter. Returns how many rows where read and processed." - (let ((filenames (case (source-type fixed) - (:stdin (list (source fixed))) - (:inline (list (car (source fixed)))) - (:regex (source fixed)) - (t (list (source fixed)))))) - (loop :for filename :in filenames - :do - (with-open-file-or-stream - ;; we just ignore files that don't exist - (input filename - :direction :input - :external-format (encoding fixed) - :if-does-not-exist nil) - (when input - ;; first go to given inline position when filename is :inline - (when (eq (source-type fixed) :inline) - (file-position input (cdr (source fixed)))) + (with-connection (cnx (source fixed)) + (loop :for input := (open-next-stream cnx + :direction :input + :external-format (encoding fixed) + :if-does-not-exist nil) + :while input + :do (progn ;; ignore as much as skip-lines lines in the file + (loop repeat (skip-lines fixed) do (read-line input nil nil)) - ;; ignore as much as skip-lines lines in the file - (loop repeat (skip-lines fixed) do (read-line input nil nil)) - - ;; read in the text file, split it into columns, process NULL - ;; columns the way postmodern expects them, and call - ;; PROCESS-ROW-FN on them - (let ((reformat-then-process - (reformat-then-process :fields (fields fixed) - :columns (columns fixed) - :target (target fixed) - :process-row-fn process-row-fn))) - (loop - :with fun := (compile nil reformat-then-process) + ;; read in the text file, split it into columns, process NULL + ;; columns the way postmodern expects them, and call + ;; PROCESS-ROW-FN on them + (let ((reformat-then-process + (reformat-then-process :fields (fields fixed) + :columns (columns fixed) + :target (target fixed) + :process-row-fn process-row-fn))) + (loop + :with fun := (compile nil reformat-then-process) :with fixed-cols-specs := (mapcar #'cdr (fields fixed)) - :for line := (read-line input nil nil) - :counting line :into read - :while line - :do (handler-case + :for line := (read-line input nil nil) + :counting line :into read + :while line + :do (handler-case (funcall fun (parse-row fixed-cols-specs line)) (condition (e) (progn (log-message :error "~a" e) - (pgstate-incf *state* (target fixed) :errs 1))))))))))) + (pgstate-incf *state* (target fixed) :errs 1)))))))))) (defmethod copy-to-queue ((fixed copy-fixed) queue) "Copy data from given FIXED definition into lparallel.queue DATAQ" @@ -126,7 +108,7 @@ :dbname (db-name (target-db fixed)) :state *state* :summary summary) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) + (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) (log-message :notice "COPY ~a" (target fixed)) (lp:submit-task channel #'copy-to-queue fixed queue) diff --git a/src/utils/threads.lisp b/src/utils/threads.lisp index 88fc9e3..60c95a9 100644 --- a/src/utils/threads.lisp +++ b/src/utils/threads.lisp @@ -13,6 +13,7 @@ (*concurrent-batches* . ,*concurrent-batches*) (*pg-settings* . ',*pg-settings*) (*state* . ,*state*) + (*fd-path-root* . ,*fd-path-root*) (*client-min-messages* . ,*client-min-messages*) (*log-min-messages* . ,*log-min-messages*)