Cleanup file based "connections".

When the notion of a connection class with a generic set of method was
invented, the very flexible specification formats available for the file
based sources where not integrated into the new connection system.

This patch provides a new connection class md-connection with a specific
sub-protocol (after opening a connection, the caller is supposed to loop
around open-next-stream) so that it's possible to both properly fit into
the connection concept and to better share the code in between our three
implementation (csv, copy, fixed).
This commit is contained in:
Dimitri Fontaine 2015-08-24 16:33:00 +02:00
parent ea35eb575d
commit 04aa743eb7
14 changed files with 216 additions and 217 deletions

View File

@ -121,7 +121,12 @@
:type (conn-type ,conn) :type (conn-type ,conn)
:host (db-host ,conn) :host (db-host ,conn)
:port (db-port ,conn) :port (db-port ,conn)
:user (db-user ,conn)))))))) :user (db-user ,conn)))
(t
(error 'connection-error
:mesg (format nil "~a" e)
:type (conn-type ,conn))))))))
(unwind-protect (unwind-protect
(progn ,@forms) (progn ,@forms)
(close-connection ,var))))) (close-connection ,var)))))

View File

@ -124,6 +124,13 @@
#:query #:query
#:check-connection #:check-connection
;; also export slot names
#:type
#:handle
#:uri
#:arch
#:path
;; file based connections API for HTTP and Archives support ;; file based connections API for HTTP and Archives support
#:fetch-file #:fetch-file
#:expand #:expand
@ -159,6 +166,13 @@
#:copy-to #:copy-to
#:copy-database #:copy-database
;; the md-connection facilities
#:md-connection
#:md-spec
#:md-strm
#:expand-spec
#:open-next-stream
;; common schema facilities ;; common schema facilities
#:push-to-end #:push-to-end
@ -264,8 +278,7 @@
(:import-from #:pgloader.pgsql (:import-from #:pgloader.pgsql
#:maybe-drop-indexes #:maybe-drop-indexes
#:create-indexes-again) #:create-indexes-again)
(:export #:*csv-path-root* (:export #:csv-connection
#:csv-connection
#:specs #:specs
#:csv-specs #:csv-specs
#:get-pathname #:get-pathname
@ -471,6 +484,9 @@
(:use #:cl #:esrap #:metabang.bind (:use #:cl #:esrap #:metabang.bind
#:pgloader.params #:pgloader.utils #:pgloader.sql #:pgloader.connection) #:pgloader.params #:pgloader.utils #:pgloader.sql #:pgloader.connection)
(:import-from #:alexandria #:read-file-into-string) (:import-from #:alexandria #:read-file-into-string)
(:import-from #:pgloader.sources
#:md-connection
#:md-spec)
(:import-from #:pgloader.pgsql (:import-from #:pgloader.pgsql
#:pgsql-connection #:pgsql-connection
#:with-pgsql-transaction #:with-pgsql-transaction
@ -554,7 +570,7 @@
#:connection-error) #:connection-error)
(:export #:*version-string* (:export #:*version-string*
#:*state* #:*state*
#:*csv-path-root* #:*fd-path-root*
#:*root-dir* #:*root-dir*
#:*pg-settings* #:*pg-settings*

View File

@ -8,7 +8,7 @@
(:export #:*version-string* (:export #:*version-string*
#:*dry-run* #:*dry-run*
#:*self-upgrade-immutable-systems* #:*self-upgrade-immutable-systems*
#:*csv-path-root* #:*fd-path-root*
#:*root-dir* #:*root-dir*
#:*log-filename* #:*log-filename*
#:*summary-pathname* #:*summary-pathname*
@ -73,8 +73,8 @@
(defparameter *state* nil (defparameter *state* nil
"State of the current loading.") "State of the current loading.")
(defparameter *csv-path-root* nil (defparameter *fd-path-root* nil
"Where to load CSV files from, when loading from an archive.") "Where to load files from, when loading from an archive or expanding regexps.")
(defparameter *root-dir* (defparameter *root-dir*
#+unix (make-pathname :directory "/tmp/pgloader/") #+unix (make-pathname :directory "/tmp/pgloader/")

View File

@ -53,7 +53,7 @@
("download" :state state-before) ("download" :state state-before)
(pgloader.archive:http-fetch-file ,url))) (pgloader.archive:http-fetch-file ,url)))
(:filename url)))) (:filename url))))
(*csv-path-root* (*fd-path-root*
(with-stats-collection ("extract" :state state-before) (with-stats-collection ("extract" :state state-before)
(pgloader.archive:expand-archive archive-file)))) (pgloader.archive:expand-archive archive-file))))
(progn (progn

View File

@ -60,7 +60,7 @@
(defrule copy-uri (and "copy://" filename) (defrule copy-uri (and "copy://" filename)
(:lambda (source) (:lambda (source)
(bind (((_ filename) source)) (bind (((_ filename) source))
(make-instance 'copy-connection :specs filename)))) (make-instance 'copy-connection :spec filename))))
(defrule copy-file-source (or stdin (defrule copy-file-source (or stdin
inline inline
@ -72,10 +72,10 @@
(if (typep src 'copy-connection) src (if (typep src 'copy-connection) src
(destructuring-bind (type &rest specs) src (destructuring-bind (type &rest specs) src
(case type (case type
(:stdin (make-instance 'copy-connection :specs src)) (:stdin (make-instance 'copy-connection :spec src))
(:inline (make-instance 'copy-connection :specs src)) (:inline (make-instance 'copy-connection :spec src))
(:filename (make-instance 'copy-connection :specs src)) (:filename (make-instance 'copy-connection :spec src))
(:regex (make-instance 'copy-connection :specs src)) (:regex (make-instance 'copy-connection :spec src))
(:http (make-instance 'copy-connection :uri (first specs)))))))) (:http (make-instance 'copy-connection :uri (first specs))))))))
(defrule get-copy-file-source-from-environment-variable (and kw-getenv name) (defrule get-copy-file-source-from-environment-variable (and kw-getenv name)

View File

@ -377,7 +377,7 @@
(defrule csv-uri (and "csv://" filename) (defrule csv-uri (and "csv://" filename)
(:lambda (source) (:lambda (source)
(bind (((_ filename) source)) (bind (((_ filename) source))
(make-instance 'csv-connection :specs filename)))) (make-instance 'csv-connection :spec filename))))
(defrule csv-file-source (or stdin (defrule csv-file-source (or stdin
inline inline
@ -389,10 +389,10 @@
(if (typep src 'csv-connection) src (if (typep src 'csv-connection) src
(destructuring-bind (type &rest specs) src (destructuring-bind (type &rest specs) src
(case type (case type
(:stdin (make-instance 'csv-connection :specs src)) (:stdin (make-instance 'csv-connection :spec src))
(:inline (make-instance 'csv-connection :specs src)) (:inline (make-instance 'csv-connection :spec src))
(:filename (make-instance 'csv-connection :specs src)) (:filename (make-instance 'csv-connection :spec src))
(:regex (make-instance 'csv-connection :specs src)) (:regex (make-instance 'csv-connection :spec src))
(:http (make-instance 'csv-connection :uri (first specs)))))))) (:http (make-instance 'csv-connection :uri (first specs))))))))
(defrule get-csv-file-source-from-environment-variable (and kw-getenv name) (defrule get-csv-file-source-from-environment-variable (and kw-getenv name)

View File

@ -68,7 +68,7 @@
(defrule fixed-uri (and "fixed://" filename) (defrule fixed-uri (and "fixed://" filename)
(:lambda (source) (:lambda (source)
(bind (((_ filename) source)) (bind (((_ filename) source))
(make-instance 'fixed-connection :specs filename)))) (make-instance 'fixed-connection :spec filename))))
(defrule fixed-file-source (or stdin (defrule fixed-file-source (or stdin
inline inline
@ -80,10 +80,10 @@
(if (typep src 'fixed-connection) src (if (typep src 'fixed-connection) src
(destructuring-bind (type &rest specs) src (destructuring-bind (type &rest specs) src
(case type (case type
(:stdin (make-instance 'fixed-connection :specs src)) (:stdin (make-instance 'fixed-connection :spec src))
(:inline (make-instance 'fixed-connection :specs src)) (:inline (make-instance 'fixed-connection :spec src))
(:filename (make-instance 'fixed-connection :specs src)) (:filename (make-instance 'fixed-connection :spec src))
(:regex (make-instance 'fixed-connection :specs src)) (:regex (make-instance 'fixed-connection :spec src))
(:http (make-instance 'fixed-connection :uri (first specs)))))))) (:http (make-instance 'fixed-connection :uri (first specs))))))))
(defrule get-fixed-file-source-from-environment-variable (and kw-getenv name) (defrule get-fixed-file-source-from-environment-variable (and kw-getenv name)

View File

@ -37,11 +37,10 @@
(loop (loop
:for s-exp :in command :for s-exp :in command
:when (and (or (typep s-exp 'csv-connection) :when (and (typep s-exp 'md-connection)
(typep s-exp 'fixed-connection)) (slot-boundp s-exp 'pgloader.sources::spec)
(slot-boundp s-exp 'specs) (eq :inline (first (md-spec s-exp))))
(eq :inline (first (csv-specs s-exp)))) :do (setf (second (md-spec s-exp)) position)
:do (setf (second (csv-specs s-exp)) position)
:and :collect s-exp :and :collect s-exp
:else :collect (if (and (consp s-exp) (listp (cdr s-exp))) :else :collect (if (and (consp s-exp) (listp (cdr s-exp)))
@ -67,13 +66,12 @@
filename))) filename)))
s-exp) s-exp)
((and (or (typep s-exp 'csv-connection) ((and (typep s-exp 'md-connection)
(typep s-exp 'fixed-connection)) (slot-boundp s-exp 'pgloader.sources::spec)
(slot-boundp s-exp 'specs) (eq :filename (car (md-spec s-exp))))
(eq :filename (car (csv-specs s-exp)))) (let ((path (second (md-spec s-exp))))
(let ((path (second (csv-specs s-exp))))
(if (uiop:relative-pathname-p path) (if (uiop:relative-pathname-p path)
(progn (setf (csv-specs s-exp) (progn (setf (md-spec s-exp)
`(:filename `(:filename
,(uiop:merge-pathnames* path ,(uiop:merge-pathnames* path
filename))) filename)))

View File

@ -3,31 +3,109 @@
;;; ;;;
(in-package #:pgloader.sources) (in-package #:pgloader.sources)
(defmacro with-open-file-or-stream ((&whole arguments (defclass md-connection (fd-connection)
stream filename-or-stream ((spec :initarg :spec :accessor md-spec)
&key &allow-other-keys) (strm :initarg :strm :accessor md-strm))
&body body) (:documentation "pgloader connection parameters for a multi-files source."))
"Generate a with-open-file call, or just bind STREAM varialbe to the
FILENAME-OR-STREAM stream when this variable is of type STREAM."
`(typecase ,filename-or-stream
(stream (let ((,stream *standard-input*))
,@body))
(t (with-open-file (,stream ,filename-or-stream ,@(cddr arguments)) (defmethod print-object ((c md-connection) stream)
,@body)))) (print-unreadable-object (c stream :type t :identity t)
(with-slots (type spec) c
(let ((path (when (slot-boundp c 'path) (slot-value c 'path))))
(format stream "~a://~a:~{~a~^,~}" type (first spec) path)))))
(defun get-pathname (dbname table-name &key (csv-path-root *csv-path-root*)) (defmethod expand :after ((md md-connection))
"Expand the archive for the MD connection."
(when (and (slot-boundp md 'pgloader.connection::path)
(slot-value md 'pgloader.connection::path)
(uiop:file-pathname-p (fd-path md)))
(setf (md-spec md) `(:filename ,(fd-path md)))))
(defmethod fetch-file :after ((md md-connection))
"When the fd-connection has an URI slot, download its file."
(when (and (slot-boundp md 'pgloader.connection::path)
(slot-value md 'pgloader.connection::path))
(setf (md-spec md) `(:filename ,(fd-path md)))))
(defgeneric expand-spec (md-connection)
(:documentation "Expand specification for an FD source."))
(defgeneric open-next-stream (md-connection &rest args &key)
(:documentation "Open the next input stream from FD."))
(defmethod expand-spec ((md md-connection))
"Given fd spec as a CONS of a source type and a tagged object, expand the
tagged object depending on the source type and return a list of pathnames."
(destructuring-bind (type &rest part) (md-spec md)
(ecase type
(:inline (list (caar part)))
(:stdin (list *standard-input*))
(:regex (destructuring-bind (keep regex root) part
(filter-directory regex
:keep keep
:root (or *fd-path-root* root))))
(:filename (let* ((filename (first part))
(realname
(if (fad:pathname-absolute-p filename) filename
(merge-pathnames filename *fd-path-root*))))
(if (probe-file realname) (list realname)
(error "File does not exists: '~a'." realname)))))))
(defmethod open-next-stream ((md md-connection)
&rest args
&key &allow-other-keys)
"Switch to the following file in the PATH list."
;; first close current stream
(when (slot-boundp md 'strm)
(close (md-strm md)))
;; now open the new one
(when (fd-path md)
(let ((current-path (pop (fd-path md))))
(when current-path
(log-message :info "Open ~s" current-path)
(prog1
(setf (md-strm md)
(typecase current-path
(stream current-path)
(t (apply #'open current-path args))))
;;
;; The :inline md spec is a little special, it's a filename and a
;; position where to skip to at opening the file. It allows for
;; easier self-contained tests.
;;
(when (eq :inline (car (md-spec md)))
(file-position (md-strm md) (cdadr (md-spec md)))))))))
(defmethod open-connection ((md md-connection) &key)
"The fd connection supports several specs to open a connection:
- if we have a path, that's a single file to open
- otherwise spec is an CONS wherin
- car is the source type
- cdr is an object suitable for `get-absolute-pathname`"
(setf (fd-path md) (expand-spec md))
md)
(defmethod close-connection ((md md-connection))
"Reset."
(when (and (slot-boundp md 'strm) (md-strm md))
(close (md-strm md)))
(setf (md-strm md) nil
(fd-path md) nil))
(defun get-pathname (dbname table-name &key (fd-path-root *fd-path-root*))
"Return a pathname where to read or write the file data" "Return a pathname where to read or write the file data"
(make-pathname (make-pathname
:directory (pathname-directory :directory (pathname-directory
(merge-pathnames (format nil "~a/" dbname) csv-path-root)) (merge-pathnames (format nil "~a/" dbname) fd-path-root))
:name table-name :name table-name
:type "csv")) :type "csv"))
(defun filter-directory (regex (defun filter-directory (regex
&key &key
(keep :first) ; or :all (keep :first) ; or :all
(root *csv-path-root*)) (root *fd-path-root*))
"Walk the ROOT directory and KEEP either the :first or :all the matches "Walk the ROOT directory and KEEP either the :first or :all the matches
against the given regexp." against the given regexp."
(let* ((candidates (pgloader.archive:get-matching-filenames root regex)) (let* ((candidates (pgloader.archive:get-matching-filenames root regex))
@ -42,26 +120,3 @@
(error "File does not exists: '~a'." candidate)) (error "File does not exists: '~a'." candidate))
finally (return candidates)))) finally (return candidates))))
(defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*))
"PATHNAME-OR-REGEX is expected to be either (:regexp expression)
or (:filename pathname). In the first case, this fonction check if the
pathname is absolute or relative and returns an absolute pathname given
current working directory of ROOT.
In the second case, walk the ROOT directory and return the first pathname
that matches the regex. TODO: consider signaling a condition when we have
more than one match."
(destructuring-bind (type &rest part) pathname-or-regex
(ecase type
(:inline (car part)) ; because of &rest
(:stdin *standard-input*)
(:regex (destructuring-bind (keep regex root) part
(filter-directory regex
:keep keep
:root (or *csv-path-root* root))))
(:filename (let* ((filename (first part))
(realname
(if (fad:pathname-absolute-p filename) filename
(merge-pathnames filename root))))
(if (probe-file realname) realname
(error "File does not exists: '~a'." realname)))))))

View File

@ -3,16 +3,14 @@
;;; ;;;
(in-package :pgloader.copy) (in-package :pgloader.copy)
(defclass copy-connection (csv-connection) ()) (defclass copy-connection (md-connection) ())
(defmethod initialize-instance :after ((csvconn copy-connection) &key) (defmethod initialize-instance :after ((copy copy-connection) &key)
"Assign the type slot to sqlite." "Assign the type slot to sqlite."
(setf (slot-value csvconn 'type) "copy")) (setf (slot-value copy 'type) "copy"))
(defclass copy-copy (copy) (defclass copy-copy (copy)
((source-type :accessor source-type ; one of :inline, :stdin, :regex ((encoding :accessor encoding ; file encoding
:initarg :source-type) ; or :filename
(encoding :accessor encoding ; file encoding
:initarg :encoding) ; :initarg :encoding) ;
(skip-lines :accessor skip-lines ; we might want to skip COPY lines (skip-lines :accessor skip-lines ; we might want to skip COPY lines
:initarg :skip-lines ; :initarg :skip-lines ;
@ -28,10 +26,6 @@
(defmethod initialize-instance :after ((copy copy-copy) &key) (defmethod initialize-instance :after ((copy copy-copy) &key)
"Compute the real source definition from the given source parameter, and "Compute the real source definition from the given source parameter, and
set the transforms function list as needed too." set the transforms function list as needed too."
(let ((source (csv-specs (slot-value copy 'source))))
(setf (slot-value copy 'source-type) (car source))
(setf (slot-value copy 'source) (get-absolute-pathname source)))
(let ((transforms (when (slot-boundp copy 'transforms) (let ((transforms (when (slot-boundp copy 'transforms)
(slot-value copy 'transforms))) (slot-value copy 'transforms)))
(columns (columns
@ -69,48 +63,37 @@
list as its only parameter. list as its only parameter.
Returns how many rows were read and processed." Returns how many rows were read and processed."
(let ((filenames (case (source-type copy) (with-connection (cnx (source copy))
(:stdin (list (source copy))) (loop :for input := (open-next-stream cnx
(:inline (list (car (source copy)))) :direction :input
(:regex (source copy)) :external-format (encoding copy)
(t (list (source copy)))))) :if-does-not-exist nil)
(loop :for filename :in filenames :while input
:do :do (progn
(with-open-file-or-stream ;; ignore as much as skip-lines lines in the file
;; we just ignore files that don't exist (loop repeat (skip-lines copy) do (read-line input nil nil))
(input filename
:direction :input
:external-format (encoding copy)
:if-does-not-exist nil)
(when input
;; first go to given inline position when filename is :inline
(when (eq (source-type copy) :inline)
(file-position input (cdr (source copy))))
;; ignore as much as skip-lines lines in the file ;; read in the text file, split it into columns, process NULL
(loop repeat (skip-lines copy) do (read-line input nil nil)) ;; columns the way postmodern expects them, and call
;; PROCESS-ROW-FN on them
;; read in the text file, split it into columns, process NULL (let ((reformat-then-process
;; columns the way postmodern expects them, and call (reformat-then-process :fields (fields copy)
;; PROCESS-ROW-FN on them :columns (columns copy)
(let ((reformat-then-process :target (target copy)
(reformat-then-process :fields (fields copy) :process-row-fn process-row-fn)))
:columns (columns copy) (loop
:target (target copy) :with fun := reformat-then-process
:process-row-fn process-row-fn))) :for line := (read-line input nil nil)
(loop :counting line :into read
:with fun := reformat-then-process :while line
:for line := (read-line input nil nil) :do (handler-case
:counting line :into read
:while line
:do (handler-case
(funcall fun (parse-row line (funcall fun (parse-row line
:delimiter (delimiter copy) :delimiter (delimiter copy)
:null-as (null-as copy))) :null-as (null-as copy)))
(condition (e) (condition (e)
(progn (progn
(log-message :error "~a" e) (log-message :error "~a" e)
(pgstate-incf *state* (target copy) :errs 1))))))))))) (pgstate-incf *state* (target copy) :errs 1))))))))))
(defmethod copy-to-queue ((copy copy-copy) queue) (defmethod copy-to-queue ((copy copy-copy) queue)
"Copy data from given COPY definition into lparallel.queue DATAQ" "Copy data from given COPY definition into lparallel.queue DATAQ"

View File

@ -13,7 +13,7 @@
;;; ;;;
(defun import-database (dbname (defun import-database (dbname
&key &key
(csv-path-root *csv-path-root*) (fd-path-root *fd-path-root*)
(skip-lines 0) (skip-lines 0)
(separator #\Tab) (separator #\Tab)
(quote cl-csv:*quote*) (quote cl-csv:*quote*)
@ -26,7 +26,7 @@
(loop (loop
for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname) for (table-name . date-columns) in (pgloader.pgsql:list-tables dbname)
for filename = (get-pathname dbname table-name for filename = (get-pathname dbname table-name
:csv-path-root csv-path-root) :fd-path-root fd-path-root)
when (or (null only-tables) when (or (null only-tables)
(member table-name only-tables :test #'equal)) (member table-name only-tables :test #'equal))
do do

View File

@ -4,32 +4,11 @@
(in-package :pgloader.csv) (in-package :pgloader.csv)
(defclass csv-connection (fd-connection) (defclass csv-connection (md-connection) ())
((specs :initarg :specs :accessor csv-specs)))
(defmethod initialize-instance :after ((csvconn csv-connection) &key) (defmethod initialize-instance :after ((csv csv-connection) &key)
"Assign the type slot to sqlite." "Assign the type slot to sqlite."
(setf (slot-value csvconn 'type) "csv")) (setf (slot-value csv 'type) "csv"))
(defmethod print-object ((csv csv-connection) stream)
(print-unreadable-object (csv stream :type t :identity t)
(let ((specs (if (slot-boundp csv 'specs) (slot-value csv 'specs)
`(:http ,(slot-value csv 'pgloader.connection::uri)))))
(with-slots (type) csv
(format stream "~a://~a:~a" type (first specs) (second specs))))))
(defmethod expand :after ((csv csv-connection))
"Expand the archive for the FD connection."
(when (and (slot-boundp csv 'pgloader.connection::path)
(slot-value csv 'pgloader.connection::path)
(uiop:file-pathname-p (fd-path csv)))
(setf (csv-specs csv) `(:filename ,(fd-path csv)))))
(defmethod fetch-file :after ((csv csv-connection))
"When the fd-connection has an URI slot, download its file."
(when (and (slot-boundp csv 'pgloader.connection::path)
(slot-value csv 'pgloader.connection::path))
(setf (csv-specs csv) `(:filename ,(fd-path csv)))))
;;; ;;;
;;; Implementing the pgloader source API ;;; Implementing the pgloader source API
@ -68,10 +47,6 @@
(defmethod initialize-instance :after ((csv copy-csv) &key) (defmethod initialize-instance :after ((csv copy-csv) &key)
"Compute the real source definition from the given source parameter, and "Compute the real source definition from the given source parameter, and
set the transforms function list as needed too." set the transforms function list as needed too."
(let ((source (csv-specs (slot-value csv 'source))))
(setf (slot-value csv 'source-type) (car source))
(setf (slot-value csv 'source) (get-absolute-pathname source)))
(let ((transforms (when (slot-boundp csv 'transforms) (let ((transforms (when (slot-boundp csv 'transforms)
(slot-value csv 'transforms))) (slot-value csv 'transforms)))
(columns (columns
@ -105,32 +80,16 @@
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter. list as its only parameter.
FILESPEC is either a filename or a pair (filename . position) where
position is the number of bytes to skip in the file before getting to the
data. That's used to handle the INLINE data loading.
Finally returns how many rows where read and processed." Finally returns how many rows where read and processed."
(let ((filenames (case (source-type csv)
(:stdin (list (source csv)))
(:inline (list (car (source csv))))
(:regex (source csv))
(t (list (source csv))))))
(loop for filename in filenames
do
(with-open-file-or-stream
;; we just ignore files that don't exist
(input filename
:direction :input
:external-format (encoding csv)
:if-does-not-exist nil)
(when input
(log-message :info "COPY FROM ~s" filename)
;; first go to given inline position when filename is :inline (with-connection (cnx (source csv))
(when (eq (source-type csv) :inline) (loop :for input := (open-next-stream cnx
(file-position input (cdr (source csv)))) :direction :input
:external-format (encoding csv)
;; we handle skipping more than one line here, as cl-csv only knows :if-does-not-exist nil)
:while input
:do (progn
;; we handle skipping more than one line here, as cl-csv only knows
;; about skipping the first line ;; about skipping the first line
(loop repeat (skip-lines csv) do (read-line input nil nil)) (loop repeat (skip-lines csv) do (read-line input nil nil))
@ -167,7 +126,7 @@
(condition (e) (condition (e)
(progn (progn
(log-message :error "~a" e) (log-message :error "~a" e)
(pgstate-incf *state* (target csv) :errs 1)))))))))) (pgstate-incf *state* (target csv) :errs 1)))))))))
(defmethod copy-to-queue ((csv copy-csv) queue) (defmethod copy-to-queue ((csv copy-csv) queue)
"Copy data from given CSV definition into lparallel.queue DATAQ" "Copy data from given CSV definition into lparallel.queue DATAQ"

View File

@ -4,16 +4,14 @@
(in-package :pgloader.fixed) (in-package :pgloader.fixed)
(defclass fixed-connection (csv-connection) ()) (defclass fixed-connection (md-connection) ())
(defmethod initialize-instance :after ((csvconn fixed-connection) &key) (defmethod initialize-instance :after ((fixed fixed-connection) &key)
"Assign the type slot to sqlite." "Assign the type slot to sqlite."
(setf (slot-value csvconn 'type) "fixed")) (setf (slot-value fixed 'type) "fixed"))
(defclass copy-fixed (copy) (defclass copy-fixed (copy)
((source-type :accessor source-type ; one of :inline, :stdin, :regex ((encoding :accessor encoding ; file encoding
:initarg :source-type) ; or :filename
(encoding :accessor encoding ; file encoding
:initarg :encoding) ; :initarg :encoding) ;
(skip-lines :accessor skip-lines ; CSV headers (skip-lines :accessor skip-lines ; CSV headers
:initarg :skip-lines ; :initarg :skip-lines ;
@ -23,10 +21,6 @@
(defmethod initialize-instance :after ((fixed copy-fixed) &key) (defmethod initialize-instance :after ((fixed copy-fixed) &key)
"Compute the real source definition from the given source parameter, and "Compute the real source definition from the given source parameter, and
set the transforms function list as needed too." set the transforms function list as needed too."
(let ((source (csv-specs (slot-value fixed 'source))))
(setf (slot-value fixed 'source-type) (car source))
(setf (slot-value fixed 'source) (get-absolute-pathname source)))
(let ((transforms (when (slot-boundp fixed 'transforms) (let ((transforms (when (slot-boundp fixed 'transforms)
(slot-value fixed 'transforms))) (slot-value fixed 'transforms)))
(columns (columns
@ -57,47 +51,35 @@
list as its only parameter. list as its only parameter.
Returns how many rows where read and processed." Returns how many rows where read and processed."
(let ((filenames (case (source-type fixed) (with-connection (cnx (source fixed))
(:stdin (list (source fixed))) (loop :for input := (open-next-stream cnx
(:inline (list (car (source fixed)))) :direction :input
(:regex (source fixed)) :external-format (encoding fixed)
(t (list (source fixed)))))) :if-does-not-exist nil)
(loop :for filename :in filenames :while input
:do :do (progn ;; ignore as much as skip-lines lines in the file
(with-open-file-or-stream (loop repeat (skip-lines fixed) do (read-line input nil nil))
;; we just ignore files that don't exist
(input filename
:direction :input
:external-format (encoding fixed)
:if-does-not-exist nil)
(when input
;; first go to given inline position when filename is :inline
(when (eq (source-type fixed) :inline)
(file-position input (cdr (source fixed))))
;; ignore as much as skip-lines lines in the file ;; read in the text file, split it into columns, process NULL
(loop repeat (skip-lines fixed) do (read-line input nil nil)) ;; columns the way postmodern expects them, and call
;; PROCESS-ROW-FN on them
;; read in the text file, split it into columns, process NULL (let ((reformat-then-process
;; columns the way postmodern expects them, and call (reformat-then-process :fields (fields fixed)
;; PROCESS-ROW-FN on them :columns (columns fixed)
(let ((reformat-then-process :target (target fixed)
(reformat-then-process :fields (fields fixed) :process-row-fn process-row-fn)))
:columns (columns fixed) (loop
:target (target fixed) :with fun := (compile nil reformat-then-process)
:process-row-fn process-row-fn)))
(loop
:with fun := (compile nil reformat-then-process)
:with fixed-cols-specs := (mapcar #'cdr (fields fixed)) :with fixed-cols-specs := (mapcar #'cdr (fields fixed))
:for line := (read-line input nil nil) :for line := (read-line input nil nil)
:counting line :into read :counting line :into read
:while line :while line
:do (handler-case :do (handler-case
(funcall fun (parse-row fixed-cols-specs line)) (funcall fun (parse-row fixed-cols-specs line))
(condition (e) (condition (e)
(progn (progn
(log-message :error "~a" e) (log-message :error "~a" e)
(pgstate-incf *state* (target fixed) :errs 1))))))))))) (pgstate-incf *state* (target fixed) :errs 1))))))))))
(defmethod copy-to-queue ((fixed copy-fixed) queue) (defmethod copy-to-queue ((fixed copy-fixed) queue)
"Copy data from given FIXED definition into lparallel.queue DATAQ" "Copy data from given FIXED definition into lparallel.queue DATAQ"
@ -126,7 +108,7 @@
:dbname (db-name (target-db fixed)) :dbname (db-name (target-db fixed))
:state *state* :state *state*
:summary summary) :summary summary)
(lp:task-handler-bind ((error #'lp:invoke-transfer-error)) (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target fixed)) (log-message :notice "COPY ~a" (target fixed))
(lp:submit-task channel #'copy-to-queue fixed queue) (lp:submit-task channel #'copy-to-queue fixed queue)

View File

@ -13,6 +13,7 @@
(*concurrent-batches* . ,*concurrent-batches*) (*concurrent-batches* . ,*concurrent-batches*)
(*pg-settings* . ',*pg-settings*) (*pg-settings* . ',*pg-settings*)
(*state* . ,*state*) (*state* . ,*state*)
(*fd-path-root* . ,*fd-path-root*)
(*client-min-messages* . ,*client-min-messages*) (*client-min-messages* . ,*client-min-messages*)
(*log-min-messages* . ,*log-min-messages*) (*log-min-messages* . ,*log-min-messages*)