Clean out the code by splitting away a bunch of PostgreSQL related facilities.

This commit is contained in:
Dimitri Fontaine 2013-10-21 22:35:22 +02:00
parent 73de5b7186
commit ffebcf3bc7
4 changed files with 310 additions and 288 deletions

View File

@ -42,10 +42,17 @@
(:file "queue" :depends-on ("package")) ; pgloader.queue
;; package pgloader.pgsql
(:file "pgsql-copy-format" :depends-on ("package"
"params"
"queue"
"utils"))
(:file "pgsql-queries" :depends-on ("package" "params"))
(:file "pgsql" :depends-on ("package"
"queue"
"utils"
"transforms"))
"transforms"
"pgsql-copy-format"
"pgsql-queries"))
;; Source format specific implementations
(:file "csv" :depends-on ("package" "pgsql"))

128
src/pgsql-copy-format.lisp Normal file
View File

@ -0,0 +1,128 @@
;;;
;;; Tools to handle PostgreSQL data format
;;;
(in-package :pgloader.pgsql)
;;;
;;; Format row to PostgreSQL COPY format, the TEXT variant.
;;;
(defun format-row (stream row &key transforms)
"Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format.
See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for
details about the format, and format specs."
(let* (*print-circle* *print-pretty*)
(loop
for (col . more?) on row
for fn in transforms
for preprocessed-col = (apply-transform-function fn col)
;; still accept postmodern :NULL in "preprocessed" data
do (if (or (null preprocessed-col)
(eq :NULL preprocessed-col))
(format stream "~a~:[~;~c~]" "\\N" more? #\Tab)
(progn
;; From PostgreSQL docs:
;;
;; In particular, the following characters must be preceded
;; by a backslash if they appear as part of a column value:
;; backslash itself, newline, carriage return, and the
;; current delimiter character.
(loop
for char across preprocessed-col
do (case char
(#\\ (format stream "\\\\")) ; 2 chars here
(#\Space (princ #\Space stream))
(#\Newline (format stream "\\n")) ; 2 chars here
(#\Return (format stream "\\r")) ; 2 chars here
(#\Tab (format stream "\\t")) ; 2 chars here
(#\Backspace (format stream "\\b")) ; 2 chars here
(#\Page (format stream "\\f")) ; 2 chars here
(t (format stream "~c" char))))
(format stream "~:[~;~c~]" more? #\Tab))))
(format stream "~%")))
;;;
;;; Read a file format in PostgreSQL COPY TEXT format, and call given
;;; function on each line.
;;;
(defun map-rows (filename &key process-row-fn)
"Load data from a text file in PostgreSQL COPY TEXT format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Finally returns how many rows where read and processed."
(with-open-file
;; we just ignore files that don't exist
(input filename
:direction :input
:if-does-not-exist nil)
(when input
;; read in the text file, split it into columns, process NULL columns
;; the way postmodern expects them, and call PROCESS-ROW-FN on them
(loop
for line = (read-line input nil)
for row = (mapcar (lambda (x)
;; we want Postmodern compliant NULLs
(if (string= "\\N" x) :null x))
;; splitting is easy, it's always on #\Tab
;; see format-row-for-copy for details
(sq:split-sequence #\Tab line))
while line
counting line into count
do (funcall process-row-fn row)
finally (return count)))))
;;;
;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL
;;; table using the COPY protocol. We expect PostgreSQL compatible data in
;;; that data format, so we don't handle any reformating here.
;;;
(defun copy-to-queue (table-name filename dataq &optional (*state* *state*))
"Copy data from file FILENAME into lparallel.queue DATAQ"
(let ((read
(pgloader.queue:map-push-queue dataq #'map-rows filename)))
(pgstate-incf *state* table-name :read read)))
(defun copy-from-file (dbname table-name filename
&key
(truncate t)
(report nil))
"Load data from clean COPY TEXT file to PostgreSQL, return how many rows."
(let* ((*state* (if report (pgloader.utils:make-pgstate) *state*))
(lp:*kernel*
(lp:make-kernel 2 :bindings
`((*pgconn-host* . ,*pgconn-host*)
(*pgconn-port* . ,*pgconn-port*)
(*pgconn-user* . ,*pgconn-user*)
(*pgconn-pass* . ,*pgconn-pass*)
(*pg-settings* . ',*pg-settings*)
(*state* . ,*state*))))
(channel (lp:make-channel))
(dataq (lq:make-queue :fixed-capacity 4096)))
(log-message :debug "pgsql:copy-from-file: ~a ~a ~a" dbname table-name filename)
(when report
(pgstate-add-table *state* dbname table-name))
(lp:submit-task channel #'copy-to-queue table-name filename dataq *state*)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
dbname table-name dataq
:state *state*
:truncate truncate)
;; now wait until both the tasks are over, and measure time it took'em
(multiple-value-bind (res secs)
(timing
(loop for tasks below 2 do (lp:receive-result channel)))
(declare (ignore res))
(when report (pgstate-incf *state* table-name :secs secs)))
(when report
(report-table-name table-name)
(report-pgtable-stats *state* table-name))))

173
src/pgsql-queries.lisp Normal file
View File

@ -0,0 +1,173 @@
;;;
;;; Tools to handle PostgreSQL queries
;;;
(in-package :pgloader.pgsql)
;;;
;;; PostgreSQL Tools connecting to a database
;;;
(defmacro with-pgsql-transaction ((dbname &key database) &body forms)
"Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if
given. To get the connection spec from the DBNAME, use `get-connection-spec'."
(if database
`(let ((pomo:*database* database))
(pomo:with-transaction ()
(set-session-gucs *pg-settings* :transaction t)
(progn ,@forms)))
;; no database given, create a new database connection
`(pomo:with-connection (get-connection-spec ,dbname)
(set-session-gucs *pg-settings*)
(pomo:with-transaction ()
(progn ,@forms)))))
(defun get-connection-spec (dbname &key (with-port t))
"pomo:with-connection and cl-postgres:open-database and open-db-writer are
not using the same connection spec format..."
(let ((conspec (list dbname *pgconn-user* *pgconn-pass* *pgconn-host*)))
(if with-port
(append conspec (list :port *pgconn-port*))
(append conspec (list *pgconn-port*)))))
(defun set-session-gucs (alist &key transaction database)
"Set given GUCs to given values for the current session."
(let ((pomo:*database* (or database pomo:*database*)))
(loop
for (name . value) in alist
for set = (format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value)
do
(log-message :debug set)
(pomo:execute set))))
;;;
;;; PostgreSQL queries
;;;
(defun pgsql-execute (sql &key ((:client-min-messages level)))
"Execute given SQL in current transaction"
(when level
(pomo:execute
(format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name level))))
(pomo:execute sql)
(when level (pomo:execute (format nil "RESET client_min_messages;"))))
;;;
;;; PostgreSQL Utility Queries
;;;
(defun truncate-table (dbname table-name)
"Truncate given TABLE-NAME in database DBNAME"
(pomo:with-connection (get-connection-spec dbname)
(pomo:execute (format nil "truncate ~a;" table-name))))
(defun list-databases (&optional (username "postgres"))
"Connect to a local database and get the database list"
(pomo:with-connection (let ((*pgconn-user* username))
(get-connection-spec "postgres"))
(loop for (dbname) in (pomo:query
"select datname
from pg_database
where datname !~ 'postgres|template'")
collect dbname)))
(defun list-tables (dbname)
"Return an alist of tables names and list of columns to pay attention to."
(pomo:with-connection
(get-connection-spec dbname)
(loop for (relname colarray) in (pomo:query "
select relname, array_agg(case when typname in ('date', 'timestamptz')
then attnum end
order by attnum)
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.relkind = 'r'
and attnum > 0
and n.nspname = 'public'
group by relname
")
collect (cons relname (loop
for attnum across colarray
unless (eq attnum :NULL)
collect attnum)))))
(defun list-tables-cols (dbname &key (schema "public") table-name-list)
"Return an alist of tables names and number of columns."
(pomo:with-connection
(get-connection-spec dbname)
(loop for (relname cols)
in (pomo:query (format nil "
select relname, count(attnum)
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.relkind = 'r'
and attnum > 0
and n.nspname = '~a'
~@[~{and relname = '~a'~^ ~}~]
group by relname
" schema table-name-list))
collect (cons relname cols))))
(defun list-columns (dbname table-name &key schema)
"Return a list of column names for given TABLE-NAME."
(pomo:with-connection
(get-connection-spec dbname)
(pomo:query (format nil "
select attname
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.oid = '~:[~*~a~;~a.~a~]'::regclass and attnum > 0
order by attnum" schema schema table-name) :column)))
(defun list-reserved-keywords (dbname)
"Connect to PostgreSQL DBNAME and fetch reserved keywords."
(with-pgsql-transaction (dbname)
(pomo:query "select word from pg_get_keywords() where catcode = 'R'" :column)))
(defun reset-all-sequences (dbname &key only-tables)
"Reset all sequences to the max value of the column they are attached to."
(pomo:with-connection (get-connection-spec dbname)
(pomo:execute "set client_min_messages to warning;")
(pomo:execute "listen seqs")
(handler-case
(pomo:execute (format nil "
DO $$
DECLARE
n integer := 0;
r record;
BEGIN
FOR r in
SELECT 'select '
|| trim(trailing ')'
from replace(pg_get_expr(d.adbin, d.adrelid),
'nextval', 'setval'))
|| ', (select greatest(max(' || a.attname || '), 1) from only '
|| nspname || '.' || relname || '));' as sql
FROM pg_class c
JOIN pg_namespace n on n.oid = c.relnamespace
JOIN pg_attribute a on a.attrelid = c.oid
JOIN pg_attrdef d on d.adrelid = a.attrelid
and d.adnum = a.attnum
and a.atthasdef
WHERE relkind = 'r' and a.attnum > 0
and pg_get_expr(d.adbin, d.adrelid) ~~ '^nextval'
~@[and c.oid in (~{'~a'::regclass~^, ~})~]
LOOP
n := n + 1;
EXECUTE r.sql;
END LOOP;
PERFORM pg_notify('seqs', n::text);
END;
$$; " only-tables))
;; now get the notification signal
(cl-postgres:postgresql-notification (c)
(parse-integer (cl-postgres:postgresql-notification-payload c))))))

View File

@ -1,171 +1,8 @@
;;;
;;; Tools to handle PostgreSQL data format
;;; The PostgreSQL COPY TO implementation, with batches and retries.
;;;
(in-package :pgloader.pgsql)
;;;
;;; PostgreSQL Tools connecting to a database
;;;
(defun get-connection-spec (dbname &key (with-port t))
"pomo:with-connection and cl-postgres:open-database and open-db-writer are
not using the same connection spec format..."
(let ((conspec (list dbname *pgconn-user* *pgconn-pass* *pgconn-host*)))
(if with-port
(append conspec (list :port *pgconn-port*))
(append conspec (list *pgconn-port*)))))
(defun set-session-gucs (alist &key transaction database)
"Set given GUCs to given values for the current session."
(let ((pomo:*database* (or database pomo:*database*)))
(loop
for (name . value) in alist
for set = (format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value)
do
(log-message :debug set)
(pomo:execute set))))
(defmacro with-pgsql-transaction ((dbname &key database) &body forms)
"Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if
given. To get the connection spec from the DBNAME, use `get-connection-spec'."
(if database
`(let ((pomo:*database* database))
(pomo:with-transaction ()
(set-session-gucs *pg-settings* :transaction t)
(progn ,@forms)))
;; no database given, create a new database connection
`(pomo:with-connection (get-connection-spec ,dbname)
(set-session-gucs *pg-settings*)
(pomo:with-transaction ()
(progn ,@forms)))))
(defun pgsql-execute (sql &key ((:client-min-messages level)))
"Execute given SQL in current transaction"
(when level
(pomo:execute
(format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name level))))
(pomo:execute sql)
(when level (pomo:execute (format nil "RESET client_min_messages;"))))
(defun truncate-table (dbname table-name)
"Truncate given TABLE-NAME in database DBNAME"
(pomo:with-connection (get-connection-spec dbname)
(pomo:execute (format nil "truncate ~a;" table-name))))
(defun list-databases (&optional (username "postgres"))
"Connect to a local database and get the database list"
(pomo:with-connection (let ((*pgconn-user* username))
(get-connection-spec "postgres"))
(loop for (dbname) in (pomo:query
"select datname
from pg_database
where datname !~ 'postgres|template'")
collect dbname)))
(defun list-tables (dbname)
"Return an alist of tables names and list of columns to pay attention to."
(pomo:with-connection
(get-connection-spec dbname)
(loop for (relname colarray) in (pomo:query "
select relname, array_agg(case when typname in ('date', 'timestamptz')
then attnum end
order by attnum)
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.relkind = 'r'
and attnum > 0
and n.nspname = 'public'
group by relname
")
collect (cons relname (loop
for attnum across colarray
unless (eq attnum :NULL)
collect attnum)))))
(defun list-tables-cols (dbname &key (schema "public") table-name-list)
"Return an alist of tables names and number of columns."
(pomo:with-connection
(get-connection-spec dbname)
(loop for (relname cols)
in (pomo:query (format nil "
select relname, count(attnum)
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.relkind = 'r'
and attnum > 0
and n.nspname = '~a'
~@[~{and relname = '~a'~^ ~}~]
group by relname
" schema table-name-list))
collect (cons relname cols))))
(defun list-columns (dbname table-name &key schema)
"Return a list of column names for given TABLE-NAME."
(pomo:with-connection
(get-connection-spec dbname)
(pomo:query (format nil "
select attname
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.oid = '~:[~*~a~;~a.~a~]'::regclass and attnum > 0
order by attnum" schema schema table-name) :column)))
(defun list-reserved-keywords (dbname)
"Connect to PostgreSQL DBNAME and fetch reserved keywords."
(with-pgsql-transaction (dbname)
(pomo:query "select word from pg_get_keywords() where catcode = 'R'" :column)))
(defun reset-all-sequences (dbname &key only-tables)
"Reset all sequences to the max value of the column they are attached to."
(pomo:with-connection (get-connection-spec dbname)
(pomo:execute "set client_min_messages to warning;")
(pomo:execute "listen seqs")
(handler-case
(pomo:execute (format nil "
DO $$
DECLARE
n integer := 0;
r record;
BEGIN
FOR r in
SELECT 'select '
|| trim(trailing ')'
from replace(pg_get_expr(d.adbin, d.adrelid),
'nextval', 'setval'))
|| ', (select greatest(max(' || a.attname || '), 1) from only '
|| nspname || '.' || relname || '));' as sql
FROM pg_class c
JOIN pg_namespace n on n.oid = c.relnamespace
JOIN pg_attribute a on a.attrelid = c.oid
JOIN pg_attrdef d on d.adrelid = a.attrelid
and d.adnum = a.attnum
and a.atthasdef
WHERE relkind = 'r' and a.attnum > 0
and pg_get_expr(d.adbin, d.adrelid) ~~ '^nextval'
~@[and c.oid in (~{'~a'::regclass~^, ~})~]
LOOP
n := n + 1;
EXECUTE r.sql;
END LOOP;
PERFORM pg_notify('seqs', n::text);
END;
$$; " only-tables))
;; now get the notification signal
(cl-postgres:postgresql-notification (c)
(parse-integer (cl-postgres:postgresql-notification-payload c))))))
;;;
;;; PostgreSQL formating tools
;;;
@ -184,76 +21,6 @@ $$; " only-tables))
;; force nil values to being cl-postgres :null special value
collect (if (null transformed-col) :null transformed-col)))
;;;
;;; Format row to PostgreSQL COPY format, the TEXT variant.
;;;
(defun format-row (stream row &key transforms)
"Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format.
See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for
details about the format, and format specs."
(let* (*print-circle* *print-pretty*)
(loop
for (col . more?) on row
for fn in transforms
for preprocessed-col = (apply-transform-function fn col)
;; still accept postmodern :NULL in "preprocessed" data
do (if (or (null preprocessed-col)
(eq :NULL preprocessed-col))
(format stream "~a~:[~;~c~]" "\\N" more? #\Tab)
(progn
;; From PostgreSQL docs:
;;
;; In particular, the following characters must be preceded
;; by a backslash if they appear as part of a column value:
;; backslash itself, newline, carriage return, and the
;; current delimiter character.
(loop
for char across preprocessed-col
do (case char
(#\\ (format stream "\\\\")) ; 2 chars here
(#\Space (princ #\Space stream))
(#\Newline (format stream "\\n")) ; 2 chars here
(#\Return (format stream "\\r")) ; 2 chars here
(#\Tab (format stream "\\t")) ; 2 chars here
(#\Backspace (format stream "\\b")) ; 2 chars here
(#\Page (format stream "\\f")) ; 2 chars here
(t (format stream "~c" char))))
(format stream "~:[~;~c~]" more? #\Tab))))
(format stream "~%")))
;;;
;;; Read a file format in PostgreSQL COPY TEXT format, and call given
;;; function on each line.
;;;
(defun map-rows (filename &key process-row-fn)
"Load data from a text file in PostgreSQL COPY TEXT format.
Each row is pre-processed then PROCESS-ROW-FN is called with the row as a
list as its only parameter.
Finally returns how many rows where read and processed."
(with-open-file
;; we just ignore files that don't exist
(input filename
:direction :input
:if-does-not-exist nil)
(when input
;; read in the text file, split it into columns, process NULL columns
;; the way postmodern expects them, and call PROCESS-ROW-FN on them
(loop
for line = (read-line input nil)
for row = (mapcar (lambda (x)
;; we want Postmodern compliant NULLs
(if (string= "\\N" x) :null x))
;; splitting is easy, it's always on #\Tab
;; see format-row-for-copy for details
(sq:split-sequence #\Tab line))
while line
counting line into count
do (funcall process-row-fn row)
finally (return count)))))
;;;
;;; Pop data from a lparallel.queue queue instance, reformat it assuming
;;; data in there are from cl-mysql, and copy it to a PostgreSQL table.
@ -340,59 +107,6 @@ Finally returns how many rows where read and processed."
(pgstate-incf *state* table-name :rows rows)
while cont)))
;;;
;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL
;;; table using the COPY protocol. We expect PostgreSQL compatible data in
;;; that data format, so we don't handle any reformating here.
;;;
(defun copy-to-queue (table-name filename dataq &optional (*state* *state*))
"Copy data from file FILENAME into lparallel.queue DATAQ"
(let ((read
(pgloader.queue:map-push-queue dataq #'map-rows filename)))
(pgstate-incf *state* table-name :read read)))
(defun copy-from-file (dbname table-name filename
&key
(truncate t)
(report nil))
"Load data from clean COPY TEXT file to PostgreSQL, return how many rows."
(let* ((*state* (if report (pgloader.utils:make-pgstate) *state*))
(lp:*kernel*
(lp:make-kernel 2 :bindings
`((*pgconn-host* . ,*pgconn-host*)
(*pgconn-port* . ,*pgconn-port*)
(*pgconn-user* . ,*pgconn-user*)
(*pgconn-pass* . ,*pgconn-pass*)
(*pg-settings* . ',*pg-settings*)
(*state* . ,*state*))))
(channel (lp:make-channel))
(dataq (lq:make-queue :fixed-capacity 4096)))
(log-message :debug "pgsql:copy-from-file: ~a ~a ~a" dbname table-name filename)
(when report
(pgstate-add-table *state* dbname table-name))
(lp:submit-task channel #'copy-to-queue table-name filename dataq *state*)
;; and start another task to push that data from the queue to PostgreSQL
(lp:submit-task channel
#'pgloader.pgsql:copy-from-queue
dbname table-name dataq
:state *state*
:truncate truncate)
;; now wait until both the tasks are over, and measure time it took'em
(multiple-value-bind (res secs)
(timing
(loop for tasks below 2 do (lp:receive-result channel)))
(declare (ignore res))
(when report (pgstate-incf *state* table-name :secs secs)))
(when report
(report-table-name table-name)
(report-pgtable-stats *state* table-name))))
;;;
;;; When a batch has been refused by PostgreSQL with a data-exception, that
;;; means it contains non-conforming data. It could be only one row in the