Implement support for FIXED COLS input files, reaching release candidate status.

This commit is contained in:
Dimitri Fontaine 2013-11-07 15:31:26 +01:00
parent 914a901230
commit 9d5dad7e3e
9 changed files with 603 additions and 115 deletions

View File

@ -344,6 +344,172 @@ The `csv` format command accepts the following clauses and options:
section are executed once the load is done. That's the right time to
create indexes and constraints, or re-enable triggers.
## LOAD FIXED COLS
This command instructs pgloader to load data from a text file containing
columns arranged in a *fixed size* manner. Here's an example:
LOAD FIXED
FROM inline (a 0 10, b 10 8, c 18 8, d 26 17)
INTO postgresql:///pgloader?fixed
(
a, b,
c time using (time-with-no-separator c),
d
)
WITH truncate
SET client_encoding to 'latin1',
work_mem to '14MB',
standard_conforming_strings to 'on'
BEFORE LOAD DO
$$ drop table if exists fixed; $$,
$$ create table fixed (
a integer,
b date,
c time,
d text
);
$$;
01234567892008052011431250firstline
01234562008052115182300left blank-padded
12345678902008052208231560another line
The `fixed` format command accepts the following clauses and options:
- *FROM*
Filename where to load the data from. Accepts an *ENCODING* option. Use
the `--list-encodings` option to know which encoding names are
supported.
The filename may be enclosed by single quotes, and could be one of the
following special values:
- *inline*
The data is found after the end of the parsed commands. Any number
of empty lines between the end of the commands and the beginning of
the data is accepted.
- *stdin*
Reads the data from the standard input stream.
The *FROM* option also supports an optional comma separated list of
*field* names describing what is expected in the `FIXED` data file.
Each field name is composed of the field name followed with specific
reader options for that field. Supported per-field reader options are
the following, where only *start* and *length* are required.
- *start*
Position in the line where to start reading that field's value. Can
be entered with decimal digits or `0x` then hexadecimal digits.
- *length*
How many bytes to read from the *start* position to read that
field's value. Same format as *start*.
- *terminated by*
See the description of *field terminated by* below.
The processing of this option is not currently implemented.
- *date format*
When the field is expected of the date type, then this option allows
to specify the date format used in the file.
The processing of this option is not currently implemented.
- *null if*
This option takes an argument which is either the keyword *blanks*
or a double-quoted string.
When *blanks* is used and the field value that is read contains only
space characters, then it's automatically converted to an SQL `NULL`
value.
When a double-quoted string is used and that string is read as the
field value, then the field value is automatically converted to an
SQL `NULL` value.
- *INTO*
The PostgreSQL connection URI must contains the name of the target table
where to load the data into. That table must have already been created
in PostgreSQL, and the name might be schema qualified.
Then *INTO* option also supports an optional comma separated list of
target columns, which are either the name of an input *field* or the
whitespace separated list of the target column name, its PostgreSQL data
type and a *USING* expression.
The *USING* expression can be any valid Common Lisp form and will be
read with the current package set to `pgloader.transforms`, so that you
can use functions defined in that package, such as functions loaded
dynamically with the `--load` command line parameter.
Each *USING* expression is compiled at runtime to native code, and will
be called in a context such as:
(destructuring-bind (field-name-1 field-name-2 ...)
row
(list column-name-1
column-name-2
(expression column-name-1 column-name-2)))
This feature allows pgloader to load any number of fields in a CSV file
into a possibly different number of columns in the database, using
custom code for that projection.
- *WITH*
When loading from a `CSV` file, the following options are supported:
- *truncate*
When this option is listed, pgloader issues a `TRUNCATE` command
against the PostgreSQL target table before reading the data file.
- *skip header*
Takes a numeric value as argument. Instruct pgloader to skip that
many lines at the beginning of the input file.
- *SET*
This clause allows to specify session parameters to be set for all the
sessions opened by pgloader. It expects a list of parameter name, the
equal sign, then the single-quoted value as a comma separated list.
The names and values of the parameters are not validated by pgloader,
they are given as-is to PostgreSQL.
- *BEFORE LOAD DO*
You can run SQL queries against the database before loading the data
from the `CSV` file. Most common SQL queries are `CREATE TABLE IF NOT
EXISTS` so that the data can be loaded.
Each command must be *dollar-quoted*: it must begin and end with a
double dollar sign, `$$`. Dollar-quoted queries are then comma
separated. No extra punctuation is expected after the last SQL query.
- *AFTER LOAD DO*
Same format as *BEFORE LOAD DO*, the dollar-quoted queries found in that
section are executed once the load is done. That's the right time to
create indexes and constraints, or re-enable triggers.
## LOAD DBF
This command instructs pgloader to load data from a `DBF` file. Here's an

View File

@ -111,7 +111,10 @@
#:copy-to-queue
#:copy-to
#:copy-database
#:filter-column-list))
#:filter-column-list
#:get-pathname
#:get-absolute-pathname
#:project-fields))
(defpackage #:pgloader.csv
(:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.sources)
@ -124,6 +127,12 @@
#:guess-csv-params
#:guess-all-csv-params))
(defpackage #:pgloader.fixed
(:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.sources)
(:export #:copy-fixed
#:copy-to-queue
#:copy-from))
(defpackage #:pgloader.db3
(:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.sources)
(:import-from #:pgloader.pgsql

View File

@ -27,7 +27,7 @@
(in-package :pgloader.params)
(defparameter *version-string* "3.0.50.2"
(defparameter *version-string* "3.0.90"
"pgloader version strings, following Emacs versionning model.")
;; we can't use pgloader.utils:make-pgstate yet because params is compiled

View File

@ -61,6 +61,7 @@
(def-keyword-rule "from")
(def-keyword-rule "csv")
(def-keyword-rule "dbf")
(def-keyword-rule "fixed")
(def-keyword-rule "into")
(def-keyword-rule "with")
(def-keyword-rule "when")
@ -1473,6 +1474,138 @@ load database
:before state-before
:finally state-after)))))))))
;;;
;;; LOAD FIXED COLUMNS FILE
;;;
;;; That has lots in common with CSV, so we share a fair amount of parsing
;;; rules with the CSV case.
;;;
(defrule hex-number (and "0x" (+ (hexdigit-char-p character)))
(:lambda (hex)
(destructuring-bind (prefix digits) hex
(declare (ignore prefix))
(parse-integer (text digits) :radix 16))))
(defrule dec-number (+ (digit-char-p character))
(:lambda (digits)
(parse-integer (text digits))))
(defrule number (or hex-number dec-number))
(defrule field-start-position (and ignore-whitespace number)
(:destructure (ws pos) (declare (ignore ws)) pos))
(defrule fixed-field-length (and ignore-whitespace number)
(:destructure (ws len) (declare (ignore ws)) len))
(defrule fixed-source-field (and csv-field-name
field-start-position fixed-field-length
csv-field-options)
(:destructure (name start len opts)
`(,name :start ,start :length ,len ,@opts)))
(defrule another-fixed-source-field (and #\, ignore-whitespace fixed-source-field)
(:lambda (source)
(destructuring-bind (comma ws field) source
(declare (ignore comma ws))
field)))
(defrule fixed-source-fields (and fixed-source-field (* another-fixed-source-field))
(:lambda (source)
(destructuring-bind (field1 fields) source
(list* field1 fields))))
(defrule fixed-source-field-list (and open-paren fixed-source-fields close-paren)
(:lambda (source)
(destructuring-bind (open field-defs close) source
(declare (ignore open close))
field-defs)))
(defrule fixed-option (or option-truncate
option-skip-header))
(defrule another-fixed-option (and #\, ignore-whitespace fixed-option)
(:lambda (source)
(destructuring-bind (comma ws option) source
(declare (ignore comma ws))
option)))
(defrule fixed-option-list (and fixed-option (* another-fixed-option))
(:lambda (source)
(destructuring-bind (opt1 opts) source
(alexandria:alist-plist `(,opt1 ,@opts)))))
(defrule fixed-options (and kw-with csv-option-list)
(:lambda (source)
(destructuring-bind (w opts) source
(declare (ignore w))
opts)))
(defrule fixed-file-source (or stdin
inline
maybe-quoted-filename))
(defrule fixed-source (and kw-load kw-fixed kw-from fixed-file-source)
(:lambda (src)
(destructuring-bind (load fixed from source) src
(declare (ignore load fixed from))
;; source is (:filename #P"pathname/here")
(destructuring-bind (type uri) source
(declare (ignore uri))
(ecase type
(:stdin source)
(:inline source)
(:filename source))))))
(defrule load-fixed-cols-file (and fixed-source (? file-encoding)
fixed-source-field-list
target
(? csv-target-column-list)
(? fixed-options)
(? gucs)
(? before-load-do)
(? after-load-do))
(:lambda (command)
(destructuring-bind (source encoding fields pg-db-uri
columns options gucs before after) command
(destructuring-bind (&key host port user password dbname table-name
&allow-other-keys)
pg-db-uri
`(lambda ()
(let* ((state-before ,(when before `(pgloader.utils:make-pgstate)))
(summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-after ,(when after `(pgloader.utils:make-pgstate)))
(*pgconn-host* ,host)
(*pgconn-port* ,port)
(*pgconn-user* ,user)
(*pgconn-pass* ,password)
(*pg-settings* ',gucs)
(source
(make-instance 'pgloader.fixed:copy-fixed
:target-db ,dbname
:source ',source
:target ,table-name
:encoding ,encoding
:fields ',fields
:columns ',columns
:skip-lines ,(or (getf options :skip-line) 0))))
(progn
,(sql-code-block dbname 'state-before before "before load")
(pgloader.sources:copy-from source
:truncate ,(getf options :truncate))
,(sql-code-block dbname 'state-after after "after load")
;; reporting
(when summary
(report-full-summary "Total import time" *state*
:before state-before
:finally state-after)))))))))
;;;
;;; LOAD ARCHIVE ...
@ -1555,6 +1688,7 @@ load database
(defrule command (and (or load-archive
load-csv-file
load-fixed-cols-file
load-dbf-file
load-mysql-database
load-sqlite-database

View File

@ -4,119 +4,6 @@
(in-package :pgloader.csv)
;;
;; Some basic tools
;;
(defun get-pathname (dbname table-name &key (csv-path-root *csv-path-root*))
"Return a pathname where to read or write the file data"
(make-pathname
:directory (pathname-directory
(merge-pathnames (format nil "~a/" dbname) csv-path-root))
:name table-name
:type "csv"))
(defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*))
"PATHNAME-OR-REGEX is expected to be either (:regexp expression)
or (:filename pathname). In the first case, this fonction check if the
pathname is absolute or relative and returns an absolute pathname given
current working directory of ROOT.
In the second case, walk the ROOT directory and return the first pathname
that matches the regex. TODO: consider signaling a condition when we have
more than one match."
(destructuring-bind (type part) pathname-or-regex
(ecase type
(:inline part)
(:stdin *standard-input*)
(:regex (first (pgloader.archive:get-matching-filenames root part)))
(:filename (if (fad:pathname-absolute-p part) part
(merge-pathnames part root))))))
;;;
;;; Project fields into columns
;;;
(defun project-fields (&key fields columns (compile t))
"The simplest projection happens when both FIELDS and COLS are nil: in
this case the projection is an identity, we simply return what we got.
Other forms of projections consist of forming columns with the result of
applying a transformation function. In that case a cols entry is a list
of '(colname type expression), the expression being the (already
compiled) function to use here."
(labels ((null-as-processing-fn (null-as)
"return a lambda form that will process a value given NULL-AS."
(if (eq null-as :blanks)
(lambda (col)
(declare (optimize speed))
(if (every (lambda (char) (char= char #\Space)) col)
nil
col))
(lambda (col)
(declare (optimize speed))
(if (string= null-as col) nil col))))
(field-name-as-symbol (field-name-or-list)
"we need to deal with symbols as we generate code"
(typecase field-name-or-list
(list (pgloader.transforms:intern-symbol (car field-name-or-list)))
(t (pgloader.transforms:intern-symbol field-name-or-list))))
(field-process-null-fn (field-name-or-list)
"Given a field entry, return a function dealing with nulls for it"
(destructuring-bind (&key null-as date-format)
(typecase field-name-or-list
(list (cdr field-name-or-list))
(t (cdr (assoc field-name-or-list fields :test #'string=))))
(declare (ignore date-format)) ; TODO
(if (null null-as)
#'identity
(null-as-processing-fn null-as)))))
(let* ((projection
(cond
;; when no specific information has been given on FIELDS and
;; COLUMNS, just apply generic NULL-AS processing
((and (null fields) (null columns))
(lambda (row) row))
((null columns)
;; when no specific information has been given on COLUMNS,
;; use the information given for FIELDS and apply per-field
;; null-as, or the generic one if none has been given for
;; that field.
(let ((process-nulls
(mapcar (function field-process-null-fn) fields)))
`(lambda (row)
(loop
:for col :in row
:for fn :in ',process-nulls
:collect (funcall fn col)))))
(t
;; project some number of FIELDS into a possibly different
;; number of COLUMNS, using given transformation functions,
;; processing NULL-AS represented values.
(let* ((args
(if fields
(mapcar (function field-name-as-symbol) fields)
(mapcar (function field-name-as-symbol) columns)))
(newrow
(loop for (name type fn) in columns
collect
;; we expect the name of a COLUMN to be the same
;; as the name of its derived FIELD when we
;; don't have any transformation function
(or fn `(funcall ,(field-process-null-fn name)
,(field-name-as-symbol name))))))
`(lambda (row)
(declare (optimize speed) (type list row))
(destructuring-bind (,@args) row
(declare (ignorable ,@args))
(list ,@newrow))))))))
;; allow for some debugging
(if compile (compile nil projection) projection))))
;;;
;;; Implementing the pgloader source API
;;;

118
src/sources/fixed.lisp Normal file
View File

@ -0,0 +1,118 @@
;;;
;;; Tools to handle MySQL data fetching
;;;
(in-package :pgloader.fixed)
(defclass copy-fixed (copy)
((encoding :accessor encoding ; file encoding
:initarg :encoding) ;
(skip-lines :accessor skip-lines ; CSV headers
:initarg :skip-lines ;
:initform 0))
(:documentation "pgloader Fixed Columns Data Source"))
(defmethod initialize-instance :after ((fixed copy-fixed) &key)
"Compute the real source definition from the given source parameter, and
set the transforms function list as needed too."
(let ((source (slot-value fixed 'source)))
(setf (slot-value fixed 'source) (get-absolute-pathname source)))
(let ((transforms (when (slot-boundp fixed 'transforms)
(slot-value fixed 'transforms)))
(columns
(or (slot-value fixed 'columns)
(pgloader.pgsql:list-columns (slot-value fixed 'target-db)
(slot-value fixed 'target)))))
(unless transforms
(setf (slot-value fixed 'transforms)
(loop for c in columns collect nil)))))
(declaim (inline parse-row))
(defmethod parse-row ((fixed copy-fixed) line)
"Parse a single line of FIXED input file and return a row of columns."
(loop for (name . opts) in (fields fixed)
collect (destructuring-bind (&key start length &allow-other-keys) opts
(subseq line start (+ start length)))))
(defmethod map-rows ((fixed copy-fixed) &key process-row-fn)
"Load data from a text file in Fixed Columns format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Returns how many rows where read and processed."
(let* ((filespec (source fixed))
(filename (if (consp filespec) (car filespec) filespec)))
(with-open-file
;; we just ignore files that don't exist
(input filename
:direction :input
:external-format (encoding fixed)
:if-does-not-exist nil)
(when input
;; first go to given inline position when filename is a consp
(when (consp filespec)
(loop repeat (cdr filespec) do (read-char input)))
;; ignore as much as skip-lines lines in the file
(loop repeat (skip-lines fixed) do (read-line input nil nil))
;; read in the text file, split it into columns, process NULL columns
;; the way postmodern expects them, and call PROCESS-ROW-FN on them
(let* ((read 0)
(projection (project-fields :fields (fields fixed)
:columns (columns fixed)))
(reformat-then-process
(lambda (row)
(let ((projected-row
(handler-case
(funcall projection row)
(condition (e)
(pgstate-incf *state* (target fixed) :errs 1)
(log-message :error
"Could not read line ~d: ~a" read e)))))
(when projected-row
(funcall process-row-fn projected-row))))))
(loop
with fun = (compile nil reformat-then-process)
for line = (read-line input nil nil)
counting line into read
while line
do (funcall fun (parse-row fixed line))
finally (return read)))))))
(defmethod copy-to-queue ((fixed copy-fixed) dataq)
"Copy data from given FIXED definition into lparallel.queue DATAQ"
(let ((read (pgloader.queue:map-push-queue dataq #'map-rows fixed)))
(pgstate-incf *state* (target fixed) :read read)))
(defmethod copy-from ((fixed copy-fixed) &key truncate)
"Copy data from given FIXED file definition into its PostgreSQL target table."
(let* ((summary (null *state*))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(lp:*kernel* (make-kernel 2))
(channel (lp:make-channel))
(dataq (lq:make-queue :fixed-capacity 4096))
(dbname (target-db fixed))
(table-name (target fixed)))
(with-stats-collection (dbname table-name :state *state* :summary summary)
(log-message :notice "COPY ~a.~a" dbname table-name)
(lp:submit-task channel #'copy-to-queue fixed dataq)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
;; this function update :rows stats
#'pgloader.pgsql:copy-from-queue dbname table-name dataq
;; we only are interested into the column names here
:columns (let ((cols (columns fixed)))
(when cols (mapcar #'car cols)))
:truncate truncate
:transforms (transforms fixed))
;; now wait until both the tasks are over
(loop for tasks below 2 do (lp:receive-result channel)
finally (lp:end-kernel)))))

View File

@ -128,3 +128,117 @@
(remove-if-not #'include
(remove-if-not #'exclude
(remove-if-not #'only all-columns)))))
;;;
;;; Some common tools for file based sources, such as CSV and FIXED
;;;
(defun get-pathname (dbname table-name &key (csv-path-root *csv-path-root*))
"Return a pathname where to read or write the file data"
(make-pathname
:directory (pathname-directory
(merge-pathnames (format nil "~a/" dbname) csv-path-root))
:name table-name
:type "csv"))
(defun get-absolute-pathname (pathname-or-regex &key (root *csv-path-root*))
"PATHNAME-OR-REGEX is expected to be either (:regexp expression)
or (:filename pathname). In the first case, this fonction check if the
pathname is absolute or relative and returns an absolute pathname given
current working directory of ROOT.
In the second case, walk the ROOT directory and return the first pathname
that matches the regex. TODO: consider signaling a condition when we have
more than one match."
(destructuring-bind (type part) pathname-or-regex
(ecase type
(:inline part)
(:stdin *standard-input*)
(:regex (first (pgloader.archive:get-matching-filenames root part)))
(:filename (if (fad:pathname-absolute-p part) part
(merge-pathnames part root))))))
;;;
;;; Project fields into columns
;;;
(defun project-fields (&key fields columns (compile t))
"The simplest projection happens when both FIELDS and COLS are nil: in
this case the projection is an identity, we simply return what we got.
Other forms of projections consist of forming columns with the result of
applying a transformation function. In that case a cols entry is a list
of '(colname type expression), the expression being the (already
compiled) function to use here."
(labels ((null-as-processing-fn (null-as)
"return a lambda form that will process a value given NULL-AS."
(if (eq null-as :blanks)
(lambda (col)
(declare (optimize speed))
(if (every (lambda (char) (char= char #\Space)) col)
nil
col))
(lambda (col)
(declare (optimize speed))
(if (string= null-as col) nil col))))
(field-name-as-symbol (field-name-or-list)
"we need to deal with symbols as we generate code"
(typecase field-name-or-list
(list (pgloader.transforms:intern-symbol (car field-name-or-list)))
(t (pgloader.transforms:intern-symbol field-name-or-list))))
(field-process-null-fn (field-name-or-list)
"Given a field entry, return a function dealing with nulls for it"
(destructuring-bind (&key null-as date-format &allow-other-keys)
(typecase field-name-or-list
(list (cdr field-name-or-list))
(t (cdr (assoc field-name-or-list fields :test #'string=))))
(declare (ignore date-format)) ; TODO
(if (null null-as)
#'identity
(null-as-processing-fn null-as)))))
(let* ((projection
(cond
;; when no specific information has been given on FIELDS and
;; COLUMNS, just apply generic NULL-AS processing
((and (null fields) (null columns))
(lambda (row) row))
((null columns)
;; when no specific information has been given on COLUMNS,
;; use the information given for FIELDS and apply per-field
;; null-as, or the generic one if none has been given for
;; that field.
(let ((process-nulls
(mapcar (function field-process-null-fn) fields)))
`(lambda (row)
(loop
:for col :in row
:for fn :in ',process-nulls
:collect (funcall fn col)))))
(t
;; project some number of FIELDS into a possibly different
;; number of COLUMNS, using given transformation functions,
;; processing NULL-AS represented values.
(let* ((args
(if fields
(mapcar (function field-name-as-symbol) fields)
(mapcar (function field-name-as-symbol) columns)))
(newrow
(loop for (name type fn) in columns
collect
;; we expect the name of a COLUMN to be the same
;; as the name of its derived FIELD when we
;; don't have any transformation function
(or fn `(funcall ,(field-process-null-fn name)
,(field-name-as-symbol name))))))
`(lambda (row)
(declare (optimize speed) (type list row))
(destructuring-bind (,@args) row
(declare (ignorable ,@args))
(list ,@newrow))))))))
;; allow for some debugging
(if compile (compile nil projection) projection))))

View File

@ -14,6 +14,7 @@
(declaim (inline intern-symbol
zero-dates-to-null
date-with-no-separator
time-with-no-separator
tinyint-to-boolean
int-to-ip
ip-range
@ -53,6 +54,21 @@
append (list name (subseq date-string start end)))
(format nil "~a-~a-~a ~a:~a:~a" year month day hour minute seconds))))
(defun time-with-no-separator
(time-string
&optional (format '((:hour 0 2)
(:minute 2 4)
(:seconds 4 6)
(:msecs 6 nil))))
"Apply this function when input date in like '08231560'"
(declare (type string date-string))
(destructuring-bind (&key hour minute seconds msecs
&allow-other-keys)
(loop
for (name start end) in format
append (list name (subseq time-string start end)))
(format nil "~a:~a:~a.~a" hour minute seconds msecs)))
(defun tinyint-to-boolean (integer-string)
"When using MySQL, strange things will happen, like encoding booleans into
tinyiny that are either 0 (false) or 1 (true). Of course PostgreSQL wants

44
test/fixed.load Normal file
View File

@ -0,0 +1,44 @@
/*
* This test is ported from pgloader 2.x where it was defined as:
*
* [fixed]
* table = fixed
* format = fixed
* filename = fixed/fixed.data
* columns = *
* fixed_specs = a:0:10, b:10:8, c:18:8, d:26:17
* reformat = c:pgtime:time
*
*/
LOAD FIXED
FROM inline (a 0 10, b 10 8, c 18 8, d 26 17)
INTO postgresql:///pgloader?fixed
(
a, b,
c time using (time-with-no-separator c),
d
)
WITH truncate
SET client_encoding to 'latin1',
work_mem to '14MB',
standard_conforming_strings to 'on'
BEFORE LOAD DO
$$ drop table if exists fixed; $$,
$$ create table fixed (
a integer,
b date,
c time,
d text
);
$$;
01234567892008052011431250firstline
01234562008052115182300left blank-padded
12345678902008052208231560another line