Implement support for PostgreSQL COPY format, fix #145.

PostgreSQL COPY format is not really CSV but something way easier to
parse. Funnily enough, parsing it as CSV is not that easy, so we add
here a special simple parser for the COPY format.

It should be quite useful too try loading again reject data files from
pgloader after manual fixing, too. It's still missing some documentation
without any good excuse for that, will add soon.
This commit is contained in:
Dimitri Fontaine 2015-01-02 18:49:17 +01:00
parent 6d76bc57e3
commit e1bc6425e2
11 changed files with 3860 additions and 96 deletions

View File

@ -93,6 +93,7 @@
(:file "command-csv")
(:file "command-ixf")
(:file "command-fixed")
(:file "command-copy")
(:file "command-dbf")
(:file "command-cast-rules")
(:file "command-mysql")
@ -128,6 +129,9 @@
(:file "fixed"
:depends-on ("csv"))
(:file "copy"
:depends-on ("csv"))
(:module "db3"
:components
((:file "db3-schema")

View File

@ -365,13 +365,15 @@
(type pgsql-connection target))
;; some preliminary checks
(when (and (typep source 'csv-connection) (null fields))
(when (and (typep source 'csv-connection)
(not (typep source 'copy-connection))
(null fields))
(error 'source-definition-error
:mesg "CSV source type requires fields definitions."))
:mesg "This data source requires fields definitions."))
(when (and (typep source 'csv-connection) (null (pgconn-table-name target)))
(error 'source-definition-error
:mesg "CSV data source require a table name target."))
:mesg "This data source require a table name target."))
(when (and (typep source 'fixed-connection) (null (pgconn-table-name target)))
(error 'source-definition-error
@ -390,13 +392,13 @@
(process-relative-pathnames
(uiop:getcwd)
(typecase source
(csv-connection
(lisp-code-for-loading-from-csv source fields target
:encoding encoding
:gucs gucs
:csv-options options
:before before
:after after))
(copy-connection
(lisp-code-for-loading-from-copy source fields target
:encoding (or encoding :default)
:gucs gucs
:copy-options options
:before before
:after after))
(fixed-connection
(lisp-code-for-loading-from-fixed source fields target
@ -406,6 +408,14 @@
:before before
:after after))
(csv-connection
(lisp-code-for-loading-from-csv source fields target
:encoding encoding
:gucs gucs
:csv-options options
:before before
:after after))
(dbf-connection
(lisp-code-for-loading-from-dbf source target
:gucs gucs

View File

@ -254,6 +254,19 @@
#:copy-to-queue
#:copy-from))
(defpackage #:pgloader.copy
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
(:import-from #:pgloader.csv
#:csv-connection
#:specs
#:csv-specs)
(:export #:copy-connection
#:copy-copy
#:copy-to-queue
#:copy-from))
(defpackage #:pgloader.ixf
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
@ -429,6 +442,8 @@
#:csv-specs)
(:import-from #:pgloader.fixed
#:fixed-connection)
(:import-from #:pgloader.copy
#:copy-connection)
(:import-from #:pgloader.sources
#:*default-cast-rules*
#:*cast-rules*)
@ -463,6 +478,7 @@
#:connection
#:csv-connection
#:fixed-connection
#:copy-connection
#:dbf-connection
#:ixf-connection
#:sqlite-connection
@ -473,6 +489,7 @@
#:lisp-code-for-loading-from-mysql
#:lisp-code-for-loading-from-csv
#:lisp-code-for-loading-from-fixed
#:lisp-code-for-loading-from-copy
#:lisp-code-for-loading-from-dbf
#:lisp-code-for-loading-from-ixf
#:lisp-code-for-loading-from-sqlite

View File

@ -0,0 +1,147 @@
;;;
;;; LOAD COPY FILE
;;;
;;; That has lots in common with CSV, so we share a fair amount of parsing
;;; rules with the CSV case.
;;;
(in-package #:pgloader.parser)
(defrule copy-source-field csv-field-name
(:lambda (field-name)
(list field-name)))
(defrule another-copy-source-field (and comma copy-source-field)
(:lambda (source)
(bind (((_ field) source)) field)))
(defrule copy-source-fields (and copy-source-field (* another-copy-source-field))
(:lambda (source)
(destructuring-bind (field1 fields) source
(list* field1 fields))))
(defrule copy-source-field-list (and open-paren copy-source-fields close-paren)
(:lambda (source)
(bind (((_ field-defs _) source)) field-defs)))
(defrule copy-option (or option-batch-rows
option-batch-size
option-batch-concurrency
option-truncate
option-skip-header))
(defrule another-copy-option (and comma copy-option)
(:lambda (source)
(bind (((_ option) source)) option)))
(defrule copy-option-list (and copy-option (* another-copy-option))
(:lambda (source)
(destructuring-bind (opt1 opts) source
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule copy-options (and kw-with csv-option-list)
(:lambda (source)
(bind (((_ opts) source))
(cons :copy-options opts))))
(defrule copy-uri (and "copy://" filename)
(:lambda (source)
(bind (((_ filename) source))
(make-instance 'copy-connection :specs filename))))
(defrule copy-file-source (or stdin
inline
http-uri
copy-uri
filename-matching
maybe-quoted-filename)
(:lambda (src)
(if (typep src 'copy-connection) src
(destructuring-bind (type &rest specs) src
(case type
(:stdin (make-instance 'copy-connection :specs src))
(:inline (make-instance 'copy-connection :specs src))
(:filename (make-instance 'copy-connection :specs src))
(:regex (make-instance 'copy-connection :specs src))
(:http (make-instance 'copy-connection :uri (first specs))))))))
(defrule get-copy-file-source-from-environment-variable (and kw-getenv name)
(:lambda (p-e-v)
(bind (((_ varname) p-e-v)
(connstring (getenv-default varname)))
(unless connstring
(error "Environment variable ~s is unset." varname))
(parse 'copy-file-source connstring))))
(defrule copy-source (and kw-load kw-copy kw-from
(or get-copy-file-source-from-environment-variable
copy-file-source))
(:lambda (src)
(bind (((_ _ _ source) src)) source)))
(defrule load-copy-file-optional-clauses (* (or copy-options
gucs
before-load
after-load))
(:lambda (clauses-list)
(alexandria:alist-plist clauses-list)))
(defrule load-copy-file-command (and copy-source (? file-encoding)
copy-source-field-list
target
(? csv-target-column-list)
load-copy-file-optional-clauses)
(:lambda (command)
(destructuring-bind (source encoding fields target columns clauses) command
`(,source ,encoding ,fields ,target ,columns ,@clauses))))
(defun lisp-code-for-loading-from-copy (copy-conn fields pg-db-conn
&key
(encoding :utf-8)
columns
gucs before after
((:copy-options options)))
`(lambda ()
(let* ((state-before (pgloader.utils:make-pgstate))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-after ,(when after `(pgloader.utils:make-pgstate)))
,@(pgsql-connection-bindings pg-db-conn gucs)
,@(batch-control-bindings options)
(source-db (with-stats-collection ("fetch" :state state-before)
(expand (fetch-file ,copy-conn)))))
(progn
,(sql-code-block pg-db-conn 'state-before before "before load")
(let ((truncate ,(getf options :truncate))
(source
(make-instance 'pgloader.copy:copy-copy
:target-db ,pg-db-conn
:source source-db
:target ,(pgconn-table-name pg-db-conn)
:encoding ,encoding
:fields ',fields
:columns ',columns
:skip-lines ,(or (getf options :skip-line) 0))))
(pgloader.sources:copy-from source :truncate truncate))
,(sql-code-block pg-db-conn 'state-after after "after load")
;; reporting
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after))))))
(defrule load-copy-file load-copy-file-command
(:lambda (command)
(bind (((source encoding fields pg-db-uri columns
&key ((:copy-options options)) gucs before after) command))
(lisp-code-for-loading-from-copy source fields pg-db-uri
:encoding encoding
:columns columns
:gucs gucs
:before before
:after after
:copy-options options))))

View File

@ -21,6 +21,7 @@
(def-keyword-rule "dbf")
(def-keyword-rule "ixf")
(def-keyword-rule "fixed")
(def-keyword-rule "copy")
(def-keyword-rule "into")
(def-keyword-rule "with")
(def-keyword-rule "when")

View File

@ -14,6 +14,7 @@
(defrule command (and (or load-archive
load-csv-file
load-fixed-cols-file
load-copy-file
load-dbf-file
load-ixf-file
load-mysql-database
@ -177,6 +178,7 @@
;;;
(defvar *data-source-filename-extensions*
'((:csv . ("csv" "tsv" "txt" "text"))
(:copy . ("copy" "dat")) ; reject data files are .dat
(:sqlite . ("sqlite" "db"))
(:dbf . ("db3" "dbf"))
(:ixf . ("ixf"))))
@ -201,6 +203,7 @@
(defvar *parse-rule-for-source-types*
'(:csv csv-file-source
:fixed fixed-file-source
:copy copy-file-source
:dbf dbf-file-source
:ixf ixf-file-source
:sqlite sqlite-uri
@ -215,6 +218,7 @@
(defrule source-uri (or csv-uri
fixed-uri
copy-uri
dbf-uri
ixf-uri
sqlite-db-uri

View File

@ -72,89 +72,3 @@ details about the format, and format specs."
finally (progn (write-bytes #\Newline)
(return bytes))))))
;;;
;;; Read a file format in PostgreSQL COPY TEXT format, and call given
;;; function on each line.
;;;
(defun map-rows (filename &key process-row-fn)
"Load data from a text file in PostgreSQL COPY TEXT format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Finally returns how many rows where read and processed."
(with-open-file
;; we just ignore files that don't exist
(input filename
:direction :input
:if-does-not-exist nil)
(when input
;; read in the text file, split it into columns, process NULL columns
;; the way postmodern expects them, and call PROCESS-ROW-FN on them
(loop
for line = (read-line input nil)
for row = (mapcar (lambda (x)
;; we want Postmodern compliant NULLs
(if (string= "\\N" x) :null x))
;; splitting is easy, it's always on #\Tab
;; see format-row-for-copy for details
(sq:split-sequence #\Tab line))
while line
counting line into count
do (funcall process-row-fn row)
finally (return count)))))
;;;
;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL
;;; table using the COPY protocol. We expect PostgreSQL compatible data in
;;; that data format, so we don't handle any reformating here.
;;;
(defun copy-to-queue (table-name filename dataq &optional (*state* *state*))
"Copy data from file FILENAME into lparallel.queue DATAQ"
(let ((read
(pgloader.queue:map-push-queue dataq #'map-rows filename)))
(pgstate-incf *state* table-name :read read)))
(defun copy-from-file (dbname table-name filename
&key
(truncate t)
(report nil))
"Load data from clean COPY TEXT file to PostgreSQL, return how many rows."
(let* ((*state* (if report (pgloader.utils:make-pgstate) *state*))
(lp:*kernel*
(lp:make-kernel 2 :bindings
`((*pgconn-host* . ,*pgconn-host*)
(*pgconn-port* . ,*pgconn-port*)
(*pgconn-user* . ,*pgconn-user*)
(*pgconn-pass* . ,*pgconn-pass*)
(*pg-settings* . ',*pg-settings*)
(*state* . ,*state*))))
(channel (lp:make-channel))
(dataq (lq:make-queue :fixed-capacity 4096)))
(log-message :debug "pgsql:copy-from-file: ~a ~a ~a" dbname table-name filename)
(when report
(pgstate-add-table *state* dbname table-name))
(lp:submit-task channel #'copy-to-queue table-name filename dataq *state*)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
dbname table-name dataq
:state *state*
:truncate truncate)
;; now wait until both the tasks are over, and measure time it took'em
(multiple-value-bind (res secs)
(timing
(loop for tasks below 2 do (lp:receive-result channel)))
(declare (ignore res))
(when report (pgstate-incf *state* table-name :secs secs)))
(when report
(report-table-name table-name)
(report-pgtable-stats *state* table-name))))

132
src/sources/copy.lisp Normal file
View File

@ -0,0 +1,132 @@
;;;
;;; Read a file format in PostgreSQL COPY TEXT format.
;;;
(in-package :pgloader.copy)
(defclass copy-connection (csv-connection) ())
(defmethod initialize-instance :after ((csvconn csv-connection) &key)
"Assign the type slot to sqlite."
(setf (slot-value csvconn 'type) "copy"))
(defclass copy-copy (copy)
((source-type :accessor source-type ; one of :inline, :stdin, :regex
:initarg :source-type) ; or :filename
(encoding :accessor encoding ; file encoding
:initarg :encoding) ;
(skip-lines :accessor skip-lines ; we might want to skip COPY lines
:initarg :skip-lines ;
:initform 0))
(:documentation "pgloader COPY Data Source"))
(defmethod initialize-instance :after ((copy copy-copy) &key)
"Compute the real source definition from the given source parameter, and
set the transforms function list as needed too."
(let ((source (csv-specs (slot-value copy 'source))))
(setf (slot-value copy 'source-type) (car source))
(setf (slot-value copy 'source) (get-absolute-pathname source)))
(let ((transforms (when (slot-boundp copy 'transforms)
(slot-value copy 'transforms)))
(columns
(or (slot-value copy 'columns)
(pgloader.pgsql:list-columns (slot-value copy 'target-db)
(slot-value copy 'target)))))
(unless transforms
(setf (slot-value copy 'transforms) (make-list (length columns))))))
(declaim (inline parse-row))
(defun parse-row (line)
"Parse a single line of COPY input file and return a row of columns."
(mapcar (lambda (x)
;; we want Postmodern compliant NULLs
(if (string= "\\N" x) :null x))
;; splitting is easy, it's always on #\Tab
;; see format-row-for-copy for details
(sq:split-sequence #\Tab line)))
(defmethod map-rows ((copy copy-copy) &key process-row-fn)
"Load data from a text file in Copy Columns format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Returns how many rows where read and processed."
(let ((filenames (case (source-type copy)
(:stdin (list (source copy)))
(:inline (list (car (source copy))))
(:regex (source copy))
(t (list (source copy))))))
(loop :for filename :in filenames
:do
(with-open-file-or-stream
;; we just ignore files that don't exist
(input filename
:direction :input
:external-format (encoding copy)
:if-does-not-exist nil)
(when input
;; first go to given inline position when filename is :inline
(when (eq (source-type copy) :inline)
(file-position input (cdr (source copy))))
;; ignore as much as skip-lines lines in the file
(loop repeat (skip-lines copy) do (read-line input nil nil))
;; read in the text file, split it into columns, process NULL
;; columns the way postmodern expects them, and call
;; PROCESS-ROW-FN on them
(let ((reformat-then-process
(reformat-then-process :fields (fields copy)
:columns (columns copy)
:target (target copy)
:process-row-fn process-row-fn)))
(loop
:with fun := reformat-then-process
:for line := (read-line input nil nil)
:counting line :into read
:while line
:do (handler-case
(funcall fun (parse-row line))
(condition (e)
(progn
(log-message :error "~a" e)
(pgstate-incf *state* (target copy) :errs 1)))))))))))
(defmethod copy-to-queue ((copy copy-copy) queue)
"Copy data from given COPY definition into lparallel.queue DATAQ"
(pgloader.queue:map-push-queue copy queue))
(defmethod copy-from ((copy copy-copy) &key truncate)
"Copy data from given COPY file definition into its PostgreSQL target table."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(queue (lq:make-queue :fixed-capacity *concurrent-batches*)))
(with-stats-collection ((target copy)
:dbname (db-name (target-db copy))
:state *state*
:summary summary)
(lp:task-handler-bind ((error #'lp:invoke-transfer-error))
(log-message :notice "COPY ~a" (target copy))
(lp:submit-task channel #'copy-to-queue copy queue)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
;; this function update :rows stats
#'pgloader.pgsql:copy-from-queue
(target-db copy) (target copy) queue
;; we only are interested into the column names here
:columns (mapcar (lambda (col)
;; always double quote column names
(format nil "~s" (car col)))
(columns copy))
:truncate truncate)
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally (lp:end-kernel))))))

View File

@ -6,6 +6,10 @@
(defclass fixed-connection (csv-connection) ())
(defmethod initialize-instance :after ((csvconn csv-connection) &key)
"Assign the type slot to sqlite."
(setf (slot-value csvconn 'type) "fixed"))
(defclass copy-fixed (copy)
((source-type :accessor source-type ; one of :inline, :stdin, :regex
:initarg :source-type) ; or :filename

28
test/copy.load Normal file
View File

@ -0,0 +1,28 @@
LOAD COPY
FROM copy://./data/track.copy
(
trackid, track, album, media, genre, composer,
milliseconds, bytes, unitprice
)
INTO postgresql:///pgloader?track_full
WITH truncate
SET client_encoding to 'latin1',
work_mem to '14MB',
standard_conforming_strings to 'on'
BEFORE LOAD DO
$$ drop table if exists track_full; $$,
$$ create table track_full (
trackid bigserial,
track text,
album text,
media text,
genre text,
composer text,
milliseconds bigint,
bytes bigint,
unitprice numeric
);
$$;

3503
test/data/track.copy Normal file

File diff suppressed because it is too large Load Diff