mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-06 03:16:10 +02:00
Implement automagic guessing of CSV parameters.
As we know how many columns we expect from the input file, it's possible to read a sample (10 lines as of this patch) and try many different CSV reader parameters combinations until we find one that works: it returns the right number of fields. It is still possible of course to specify parameters on the command line or in a load file if necessary, but it makes the simple case even simpler. As simple as: pgloader file.csv pgsql:///pgloader?tablename=target
This commit is contained in:
parent
14e1830b77
commit
471f2b6d88
15
pgloader.1
15
pgloader.1
@ -247,7 +247,20 @@ pgloader \-\-verbose \./test/csv\-districts\.load
|
||||
.IP "" 0
|
||||
.
|
||||
.SS "CSV"
|
||||
Load data from a CSV file into a pre\-existing table in your database:
|
||||
Load data from a CSV file into a pre\-existing table in your database, having pgloader guess the CSV properties (separator, quote and escape character):
|
||||
.
|
||||
.IP "" 4
|
||||
.
|
||||
.nf
|
||||
|
||||
pgloader \./test/data/matching\-1\.csv pgsql:///pgloader?tablename=matching
|
||||
.
|
||||
.fi
|
||||
.
|
||||
.IP "" 0
|
||||
.
|
||||
.P
|
||||
Load data from a CSV file into a pre\-existing table in your database, with expanded options:
|
||||
.
|
||||
.IP "" 4
|
||||
.
|
||||
|
||||
@ -219,7 +219,13 @@ that file and execute the commands found in it:
|
||||
|
||||
### CSV
|
||||
|
||||
Load data from a CSV file into a pre-existing table in your database:
|
||||
Load data from a CSV file into a pre-existing table in your database, having
|
||||
pgloader guess the CSV properties (separator, quote and escape character):
|
||||
|
||||
pgloader ./test/data/matching-1.csv pgsql:///pgloader?tablename=matching
|
||||
|
||||
Load data from a CSV file into a pre-existing table in your database, with
|
||||
expanded options:
|
||||
|
||||
pgloader --type csv \
|
||||
--field id --field field \
|
||||
|
||||
135
src/main.lisp
135
src/main.lisp
@ -541,14 +541,21 @@ Parameters here are meant to be already parsed, see parse-cli-optargs."
|
||||
(declare (type connection source)
|
||||
(type pgsql-connection target))
|
||||
|
||||
(when (and (typep source 'csv-connection) (null (pgconn-table-name target)))
|
||||
(when (and (typep source 'csv-connection)
|
||||
(null (pgconn-table-name target)))
|
||||
(error 'source-definition-error
|
||||
:mesg "This data source require a table name target."))
|
||||
:mesg "CSV data source require a table name target."))
|
||||
|
||||
(when (and (typep source 'fixed-connection) (null (pgconn-table-name target)))
|
||||
(when (and (typep source 'fixed-connection)
|
||||
(null (pgconn-table-name target)))
|
||||
(error 'source-definition-error
|
||||
:mesg "Fixed-width data source require a table name target."))
|
||||
|
||||
(when (and (typep source 'fixed-connection)
|
||||
(null fields))
|
||||
(error 'source-definition-error
|
||||
:mesg "Fixed-width data source require fields specs."))
|
||||
|
||||
(with-monitor (:start-logger start-logger)
|
||||
(when (and casts (not (member (type-of source)
|
||||
'(sqlite-connection
|
||||
@ -558,70 +565,72 @@ Parameters here are meant to be already parsed, see parse-cli-optargs."
|
||||
|
||||
;; now generates the code for the command
|
||||
(log-message :debug "LOAD DATA FROM ~s" source)
|
||||
(run-commands
|
||||
(process-relative-pathnames
|
||||
(uiop:getcwd)
|
||||
(typecase source
|
||||
(copy-connection
|
||||
(lisp-code-for-loading-from-copy source fields target
|
||||
:encoding (or encoding :default)
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
(let ((code
|
||||
(etypecase source
|
||||
(copy-connection
|
||||
(lisp-code-for-loading-from-copy source target
|
||||
:fields fields
|
||||
:encoding (or encoding :default)
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
|
||||
(fixed-connection
|
||||
(lisp-code-for-loading-from-fixed source fields target
|
||||
:encoding encoding
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
(fixed-connection
|
||||
(lisp-code-for-loading-from-fixed source target
|
||||
:fields fields
|
||||
:encoding encoding
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
|
||||
(csv-connection
|
||||
(lisp-code-for-loading-from-csv source fields target
|
||||
:encoding encoding
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
(csv-connection
|
||||
(lisp-code-for-loading-from-csv source target
|
||||
:fields fields
|
||||
:encoding encoding
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
|
||||
(dbf-connection
|
||||
(lisp-code-for-loading-from-dbf source target
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
(dbf-connection
|
||||
(lisp-code-for-loading-from-dbf source target
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
|
||||
(ixf-connection
|
||||
(lisp-code-for-loading-from-ixf source target
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
(ixf-connection
|
||||
(lisp-code-for-loading-from-ixf source target
|
||||
:gucs gucs
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
|
||||
(sqlite-connection
|
||||
(lisp-code-for-loading-from-sqlite source target
|
||||
:gucs gucs
|
||||
:casts casts
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
(sqlite-connection
|
||||
(lisp-code-for-loading-from-sqlite source target
|
||||
:gucs gucs
|
||||
:casts casts
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
|
||||
(mysql-connection
|
||||
(lisp-code-for-loading-from-mysql source target
|
||||
:gucs gucs
|
||||
:casts casts
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
(mysql-connection
|
||||
(lisp-code-for-loading-from-mysql source target
|
||||
:gucs gucs
|
||||
:casts casts
|
||||
:options options
|
||||
:before before
|
||||
:after after))
|
||||
|
||||
(mssql-connection
|
||||
(lisp-code-for-loading-from-mssql source target
|
||||
:gucs gucs
|
||||
:casts casts
|
||||
:options options
|
||||
:before before
|
||||
:after after))))
|
||||
:start-logger nil
|
||||
:flush-summary flush-summary)))
|
||||
(mssql-connection
|
||||
(lisp-code-for-loading-from-mssql source target
|
||||
:gucs gucs
|
||||
:casts casts
|
||||
:options options
|
||||
:before before
|
||||
:after after)))))
|
||||
(run-commands (process-relative-pathnames (uiop:getcwd) code)
|
||||
:start-logger nil
|
||||
:flush-summary flush-summary))))
|
||||
|
||||
@ -103,9 +103,10 @@
|
||||
(destructuring-bind (source encoding fields target columns clauses) command
|
||||
`(,source ,encoding ,fields ,target ,columns ,@clauses))))
|
||||
|
||||
(defun lisp-code-for-loading-from-copy (copy-conn fields pg-db-conn
|
||||
(defun lisp-code-for-loading-from-copy (copy-conn pg-db-conn
|
||||
&key
|
||||
(encoding :utf-8)
|
||||
fields
|
||||
columns
|
||||
gucs before after options
|
||||
&aux
|
||||
@ -163,8 +164,9 @@
|
||||
(cond (*dry-run*
|
||||
(lisp-code-for-csv-dry-run pg-db-uri))
|
||||
(t
|
||||
(lisp-code-for-loading-from-copy source fields pg-db-uri
|
||||
(lisp-code-for-loading-from-copy source pg-db-uri
|
||||
:encoding encoding
|
||||
:fields fields
|
||||
:columns columns
|
||||
:gucs gucs
|
||||
:before before
|
||||
|
||||
@ -397,9 +397,10 @@
|
||||
(log-message :log "DRY RUN, only checking PostgreSQL connection.")
|
||||
(check-connection ,pg-db-conn)))
|
||||
|
||||
(defun lisp-code-for-loading-from-csv (csv-conn fields pg-db-conn
|
||||
(defun lisp-code-for-loading-from-csv (csv-conn pg-db-conn
|
||||
&key
|
||||
(encoding :utf-8)
|
||||
fields
|
||||
columns
|
||||
gucs before after options
|
||||
&aux
|
||||
@ -457,8 +458,9 @@
|
||||
(cond (*dry-run*
|
||||
(lisp-code-for-csv-dry-run pg-db-uri))
|
||||
(t
|
||||
(lisp-code-for-loading-from-csv source fields pg-db-uri
|
||||
(lisp-code-for-loading-from-csv source pg-db-uri
|
||||
:encoding encoding
|
||||
:fields fields
|
||||
:columns columns
|
||||
:gucs gucs
|
||||
:before before
|
||||
|
||||
@ -111,9 +111,10 @@
|
||||
(destructuring-bind (source encoding fields target columns clauses) command
|
||||
`(,source ,encoding ,fields ,target ,columns ,@clauses))))
|
||||
|
||||
(defun lisp-code-for-loading-from-fixed (fixed-conn fields pg-db-conn
|
||||
(defun lisp-code-for-loading-from-fixed (fixed-conn pg-db-conn
|
||||
&key
|
||||
(encoding :utf-8)
|
||||
fields
|
||||
columns
|
||||
gucs before after options
|
||||
&aux
|
||||
@ -165,8 +166,9 @@
|
||||
(cond (*dry-run*
|
||||
(lisp-code-for-csv-dry-run pg-db-uri))
|
||||
(t
|
||||
(lisp-code-for-loading-from-fixed source fields pg-db-uri
|
||||
(lisp-code-for-loading-from-fixed source pg-db-uri
|
||||
:encoding encoding
|
||||
:fields fields
|
||||
:columns columns
|
||||
:gucs gucs
|
||||
:before before
|
||||
|
||||
@ -20,41 +20,59 @@
|
||||
:if-does-not-exist nil)
|
||||
(when input
|
||||
(loop
|
||||
for line = (read-line input nil)
|
||||
while line
|
||||
repeat sample-size
|
||||
collect line))))
|
||||
:for line := (read-line input nil)
|
||||
:while line
|
||||
:repeat sample-size
|
||||
:collect line))))
|
||||
|
||||
(defun get-stream-sample (stream &key (sample-size 10))
|
||||
"Return the first SAMPLE-SIZE lines in FILENAME (or less), or nil if the
|
||||
file does not exists."
|
||||
(let ((start-position (file-position stream)))
|
||||
(unwind-protect
|
||||
(loop
|
||||
:for line := (read-line stream nil)
|
||||
:while line
|
||||
:repeat sample-size
|
||||
:collect line)
|
||||
(file-position stream start-position))))
|
||||
|
||||
(defun try-csv-params (lines cols &key separator quote escape)
|
||||
"Read LINES as CSV with SEPARATOR and ESCAPE params, and return T when
|
||||
each line in LINES then contains exactly COLS columns"
|
||||
(let ((rows (loop
|
||||
for line in lines
|
||||
append
|
||||
(handler-case
|
||||
(cl-csv:read-csv line
|
||||
:quote quote
|
||||
:separator separator
|
||||
:escape escape)
|
||||
((or cl-csv:csv-parse-error type-error) ()
|
||||
nil)))))
|
||||
(let ((rows
|
||||
(loop
|
||||
:for line :in lines
|
||||
:append (handler-case
|
||||
(cl-csv:read-csv line
|
||||
:quote quote
|
||||
:separator separator
|
||||
:escape escape)
|
||||
((or cl-csv:csv-parse-error type-error) ()
|
||||
nil)))))
|
||||
(and rows
|
||||
(every (lambda (row) (= cols (length row))) rows))))
|
||||
|
||||
(defun guess-csv-params (filename cols &key (sample-size 10))
|
||||
(defun guess-csv-params (filename-or-stream nb-cols &key (sample-size 10))
|
||||
"Try a bunch of field separators with LINES and return the first one that
|
||||
returns COLS number of columns"
|
||||
|
||||
(let ((sample (get-file-sample filename :sample-size sample-size)))
|
||||
(let ((sample
|
||||
(etypecase filename-or-stream
|
||||
(pathname
|
||||
(get-file-sample filename-or-stream :sample-size sample-size))
|
||||
(string
|
||||
(get-file-sample filename-or-stream :sample-size sample-size))
|
||||
(stream
|
||||
(get-stream-sample filename-or-stream :sample-size sample-size)))))
|
||||
(loop
|
||||
for sep in *separators*
|
||||
for esc = (loop
|
||||
for escape in *escape-quotes*
|
||||
when (try-csv-params sample cols
|
||||
:quote #\"
|
||||
:separator sep
|
||||
:escape escape)
|
||||
do (return escape))
|
||||
when esc
|
||||
do (return (list :separator sep :quote #\" :escape esc)))))
|
||||
:for sep :in *separators*
|
||||
:for esc := (loop
|
||||
:for escape :in *escape-quotes*
|
||||
:when (try-csv-params sample nb-cols
|
||||
:quote #\"
|
||||
:separator sep
|
||||
:escape escape)
|
||||
:do (return escape))
|
||||
:when esc
|
||||
:do (return (list :separator sep :quote #\" :escape esc)))))
|
||||
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
:initarg :source-type) ; or :filename
|
||||
(separator :accessor csv-separator ; CSV separator
|
||||
:initarg :separator ;
|
||||
:initform #\Tab) ;
|
||||
:initform nil) ;
|
||||
(newline :accessor csv-newline ; CSV line ending
|
||||
:initarg :newline ;
|
||||
:initform #\Newline)
|
||||
@ -73,24 +73,39 @@
|
||||
|
||||
(defmethod process-rows ((csv copy-csv) stream process-fn)
|
||||
"Process rows from STREAM according to COPY specifications and PROCESS-FN."
|
||||
(handler-case
|
||||
(handler-bind ((cl-csv:csv-parse-error
|
||||
#'(lambda (c)
|
||||
(log-message :error "~a" c)
|
||||
(update-stats :data (target csv) :errs 1)
|
||||
(cl-csv::continue))))
|
||||
(cl-csv:read-csv stream
|
||||
:row-fn process-fn
|
||||
:separator (csv-separator csv)
|
||||
:quote (csv-quote csv)
|
||||
:escape (csv-escape csv)
|
||||
:escape-mode (csv-escape-mode csv)
|
||||
:unquoted-empty-string-is-nil t
|
||||
:quoted-empty-string-is-nil nil
|
||||
:trim-outer-whitespace (csv-trim-blanks csv)
|
||||
:newline (csv-newline csv)))
|
||||
(condition (e)
|
||||
(progn
|
||||
(log-message :fatal "~a" e)
|
||||
(update-stats :data (target csv) :errs 1)))))
|
||||
(let ((separator (csv-separator csv))
|
||||
(quote (csv-quote csv))
|
||||
(escape (csv-escape csv)))
|
||||
(unless separator
|
||||
;; try to guess the CSV format
|
||||
(let ((nb-columns (length (columns csv))))
|
||||
(destructuring-bind (&key
|
||||
((:separator sep) #\Tab)
|
||||
((:quote q) cl-csv:*quote*)
|
||||
((:escape esc) cl-csv:*quote-escape*))
|
||||
(guess-csv-params stream nb-columns)
|
||||
(setf separator sep
|
||||
quote q
|
||||
escape esc))))
|
||||
|
||||
(handler-case
|
||||
(handler-bind ((cl-csv:csv-parse-error
|
||||
#'(lambda (c)
|
||||
(log-message :error "~a" c)
|
||||
(update-stats :data (target csv) :errs 1)
|
||||
(cl-csv::continue))))
|
||||
(cl-csv:read-csv stream
|
||||
:row-fn process-fn
|
||||
:separator separator
|
||||
:quote quote
|
||||
:escape escape
|
||||
:escape-mode (csv-escape-mode csv)
|
||||
:unquoted-empty-string-is-nil t
|
||||
:quoted-empty-string-is-nil nil
|
||||
:trim-outer-whitespace (csv-trim-blanks csv)
|
||||
:newline (csv-newline csv)))
|
||||
(condition (e)
|
||||
(progn
|
||||
(log-message :fatal "~a" e)
|
||||
(update-stats :data (target csv) :errs 1))))))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user