From ffebcf3bc7a133a3eb2ef3c8eeaae002bbc70a4f Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Mon, 21 Oct 2013 22:35:22 +0200 Subject: [PATCH] Clean out the code by splitting away a bunch of PostgreSQL related facilities. --- pgloader.asd | 9 +- src/pgsql-copy-format.lisp | 128 +++++++++++++++++ src/pgsql-queries.lisp | 173 ++++++++++++++++++++++ src/pgsql.lisp | 288 +------------------------------------ 4 files changed, 310 insertions(+), 288 deletions(-) create mode 100644 src/pgsql-copy-format.lisp create mode 100644 src/pgsql-queries.lisp diff --git a/pgloader.asd b/pgloader.asd index 07b0790..5ec02cf 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -42,10 +42,17 @@ (:file "queue" :depends-on ("package")) ; pgloader.queue ;; package pgloader.pgsql + (:file "pgsql-copy-format" :depends-on ("package" + "params" + "queue" + "utils")) + (:file "pgsql-queries" :depends-on ("package" "params")) (:file "pgsql" :depends-on ("package" "queue" "utils" - "transforms")) + "transforms" + "pgsql-copy-format" + "pgsql-queries")) ;; Source format specific implementations (:file "csv" :depends-on ("package" "pgsql")) diff --git a/src/pgsql-copy-format.lisp b/src/pgsql-copy-format.lisp new file mode 100644 index 0000000..c4fcddf --- /dev/null +++ b/src/pgsql-copy-format.lisp @@ -0,0 +1,128 @@ +;;; +;;; Tools to handle PostgreSQL data format +;;; +(in-package :pgloader.pgsql) + +;;; +;;; Format row to PostgreSQL COPY format, the TEXT variant. +;;; +(defun format-row (stream row &key transforms) + "Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format. + +See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for +details about the format, and format specs." + (let* (*print-circle* *print-pretty*) + (loop + for (col . more?) on row + for fn in transforms + for preprocessed-col = (apply-transform-function fn col) + ;; still accept postmodern :NULL in "preprocessed" data + do (if (or (null preprocessed-col) + (eq :NULL preprocessed-col)) + (format stream "~a~:[~;~c~]" "\\N" more? #\Tab) + (progn + ;; From PostgreSQL docs: + ;; + ;; In particular, the following characters must be preceded + ;; by a backslash if they appear as part of a column value: + ;; backslash itself, newline, carriage return, and the + ;; current delimiter character. + (loop + for char across preprocessed-col + do (case char + (#\\ (format stream "\\\\")) ; 2 chars here + (#\Space (princ #\Space stream)) + (#\Newline (format stream "\\n")) ; 2 chars here + (#\Return (format stream "\\r")) ; 2 chars here + (#\Tab (format stream "\\t")) ; 2 chars here + (#\Backspace (format stream "\\b")) ; 2 chars here + (#\Page (format stream "\\f")) ; 2 chars here + (t (format stream "~c" char)))) + (format stream "~:[~;~c~]" more? #\Tab)))) + (format stream "~%"))) + +;;; +;;; Read a file format in PostgreSQL COPY TEXT format, and call given +;;; function on each line. +;;; +(defun map-rows (filename &key process-row-fn) + "Load data from a text file in PostgreSQL COPY TEXT format. + +Each row is pre-processed then PROCESS-ROW-FN is called with the row as a +list as its only parameter. + +Finally returns how many rows where read and processed." + (with-open-file + ;; we just ignore files that don't exist + (input filename + :direction :input + :if-does-not-exist nil) + (when input + ;; read in the text file, split it into columns, process NULL columns + ;; the way postmodern expects them, and call PROCESS-ROW-FN on them + (loop + for line = (read-line input nil) + for row = (mapcar (lambda (x) + ;; we want Postmodern compliant NULLs + (if (string= "\\N" x) :null x)) + ;; splitting is easy, it's always on #\Tab + ;; see format-row-for-copy for details + (sq:split-sequence #\Tab line)) + while line + counting line into count + do (funcall process-row-fn row) + finally (return count))))) + +;;; +;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL +;;; table using the COPY protocol. We expect PostgreSQL compatible data in +;;; that data format, so we don't handle any reformating here. +;;; +(defun copy-to-queue (table-name filename dataq &optional (*state* *state*)) + "Copy data from file FILENAME into lparallel.queue DATAQ" + (let ((read + (pgloader.queue:map-push-queue dataq #'map-rows filename))) + (pgstate-incf *state* table-name :read read))) + +(defun copy-from-file (dbname table-name filename + &key + (truncate t) + (report nil)) + "Load data from clean COPY TEXT file to PostgreSQL, return how many rows." + (let* ((*state* (if report (pgloader.utils:make-pgstate) *state*)) + (lp:*kernel* + (lp:make-kernel 2 :bindings + `((*pgconn-host* . ,*pgconn-host*) + (*pgconn-port* . ,*pgconn-port*) + (*pgconn-user* . ,*pgconn-user*) + (*pgconn-pass* . ,*pgconn-pass*) + (*pg-settings* . ',*pg-settings*) + (*state* . ,*state*)))) + (channel (lp:make-channel)) + (dataq (lq:make-queue :fixed-capacity 4096))) + + (log-message :debug "pgsql:copy-from-file: ~a ~a ~a" dbname table-name filename) + + (when report + (pgstate-add-table *state* dbname table-name)) + + (lp:submit-task channel #'copy-to-queue table-name filename dataq *state*) + + ;; and start another task to push that data from the queue to PostgreSQL + (lp:submit-task channel + #'pgloader.pgsql:copy-from-queue + dbname table-name dataq + :state *state* + :truncate truncate) + + ;; now wait until both the tasks are over, and measure time it took'em + (multiple-value-bind (res secs) + (timing + (loop for tasks below 2 do (lp:receive-result channel))) + (declare (ignore res)) + (when report (pgstate-incf *state* table-name :secs secs))) + + (when report + (report-table-name table-name) + (report-pgtable-stats *state* table-name)))) + diff --git a/src/pgsql-queries.lisp b/src/pgsql-queries.lisp new file mode 100644 index 0000000..eae85b4 --- /dev/null +++ b/src/pgsql-queries.lisp @@ -0,0 +1,173 @@ +;;; +;;; Tools to handle PostgreSQL queries +;;; +(in-package :pgloader.pgsql) + +;;; +;;; PostgreSQL Tools connecting to a database +;;; +(defmacro with-pgsql-transaction ((dbname &key database) &body forms) + "Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if + given. To get the connection spec from the DBNAME, use `get-connection-spec'." + (if database + `(let ((pomo:*database* database)) + (pomo:with-transaction () + (set-session-gucs *pg-settings* :transaction t) + (progn ,@forms))) + ;; no database given, create a new database connection + `(pomo:with-connection (get-connection-spec ,dbname) + (set-session-gucs *pg-settings*) + (pomo:with-transaction () + (progn ,@forms))))) + +(defun get-connection-spec (dbname &key (with-port t)) + "pomo:with-connection and cl-postgres:open-database and open-db-writer are + not using the same connection spec format..." + (let ((conspec (list dbname *pgconn-user* *pgconn-pass* *pgconn-host*))) + (if with-port + (append conspec (list :port *pgconn-port*)) + (append conspec (list *pgconn-port*))))) + +(defun set-session-gucs (alist &key transaction database) + "Set given GUCs to given values for the current session." + (let ((pomo:*database* (or database pomo:*database*))) + (loop + for (name . value) in alist + for set = (format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value) + do + (log-message :debug set) + (pomo:execute set)))) + +;;; +;;; PostgreSQL queries +;;; +(defun pgsql-execute (sql &key ((:client-min-messages level))) + "Execute given SQL in current transaction" + (when level + (pomo:execute + (format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name level)))) + + (pomo:execute sql) + + (when level (pomo:execute (format nil "RESET client_min_messages;")))) + +;;; +;;; PostgreSQL Utility Queries +;;; +(defun truncate-table (dbname table-name) + "Truncate given TABLE-NAME in database DBNAME" + (pomo:with-connection (get-connection-spec dbname) + (pomo:execute (format nil "truncate ~a;" table-name)))) + +(defun list-databases (&optional (username "postgres")) + "Connect to a local database and get the database list" + (pomo:with-connection (let ((*pgconn-user* username)) + (get-connection-spec "postgres")) + (loop for (dbname) in (pomo:query + "select datname + from pg_database + where datname !~ 'postgres|template'") + collect dbname))) + +(defun list-tables (dbname) + "Return an alist of tables names and list of columns to pay attention to." + (pomo:with-connection + (get-connection-spec dbname) + + (loop for (relname colarray) in (pomo:query " +select relname, array_agg(case when typname in ('date', 'timestamptz') + then attnum end + order by attnum) + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + left join pg_attribute a on c.oid = a.attrelid + join pg_type t on t.oid = a.atttypid + where c.relkind = 'r' + and attnum > 0 + and n.nspname = 'public' + group by relname +") + collect (cons relname (loop + for attnum across colarray + unless (eq attnum :NULL) + collect attnum))))) + +(defun list-tables-cols (dbname &key (schema "public") table-name-list) + "Return an alist of tables names and number of columns." + (pomo:with-connection + (get-connection-spec dbname) + + (loop for (relname cols) + in (pomo:query (format nil " + select relname, count(attnum) + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + left join pg_attribute a on c.oid = a.attrelid + join pg_type t on t.oid = a.atttypid + where c.relkind = 'r' + and attnum > 0 + and n.nspname = '~a' + ~@[~{and relname = '~a'~^ ~}~] + group by relname +" schema table-name-list)) + collect (cons relname cols)))) + +(defun list-columns (dbname table-name &key schema) + "Return a list of column names for given TABLE-NAME." + (pomo:with-connection + (get-connection-spec dbname) + + (pomo:query (format nil " + select attname + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + left join pg_attribute a on c.oid = a.attrelid + join pg_type t on t.oid = a.atttypid + where c.oid = '~:[~*~a~;~a.~a~]'::regclass and attnum > 0 + order by attnum" schema schema table-name) :column))) + +(defun list-reserved-keywords (dbname) + "Connect to PostgreSQL DBNAME and fetch reserved keywords." + (with-pgsql-transaction (dbname) + (pomo:query "select word from pg_get_keywords() where catcode = 'R'" :column))) + +(defun reset-all-sequences (dbname &key only-tables) + "Reset all sequences to the max value of the column they are attached to." + (pomo:with-connection (get-connection-spec dbname) + (pomo:execute "set client_min_messages to warning;") + (pomo:execute "listen seqs") + + (handler-case + (pomo:execute (format nil " +DO $$ +DECLARE + n integer := 0; + r record; +BEGIN + FOR r in + SELECT 'select ' + || trim(trailing ')' + from replace(pg_get_expr(d.adbin, d.adrelid), + 'nextval', 'setval')) + || ', (select greatest(max(' || a.attname || '), 1) from only ' + || nspname || '.' || relname || '));' as sql + FROM pg_class c + JOIN pg_namespace n on n.oid = c.relnamespace + JOIN pg_attribute a on a.attrelid = c.oid + JOIN pg_attrdef d on d.adrelid = a.attrelid + and d.adnum = a.attnum + and a.atthasdef + WHERE relkind = 'r' and a.attnum > 0 + and pg_get_expr(d.adbin, d.adrelid) ~~ '^nextval' + ~@[and c.oid in (~{'~a'::regclass~^, ~})~] + LOOP + n := n + 1; + EXECUTE r.sql; + END LOOP; + + PERFORM pg_notify('seqs', n::text); +END; +$$; " only-tables)) + ;; now get the notification signal + (cl-postgres:postgresql-notification (c) + (parse-integer (cl-postgres:postgresql-notification-payload c)))))) diff --git a/src/pgsql.lisp b/src/pgsql.lisp index 03f861d..a7f931a 100644 --- a/src/pgsql.lisp +++ b/src/pgsql.lisp @@ -1,171 +1,8 @@ ;;; -;;; Tools to handle PostgreSQL data format +;;; The PostgreSQL COPY TO implementation, with batches and retries. ;;; (in-package :pgloader.pgsql) -;;; -;;; PostgreSQL Tools connecting to a database -;;; -(defun get-connection-spec (dbname &key (with-port t)) - "pomo:with-connection and cl-postgres:open-database and open-db-writer are - not using the same connection spec format..." - (let ((conspec (list dbname *pgconn-user* *pgconn-pass* *pgconn-host*))) - (if with-port - (append conspec (list :port *pgconn-port*)) - (append conspec (list *pgconn-port*))))) - -(defun set-session-gucs (alist &key transaction database) - "Set given GUCs to given values for the current session." - (let ((pomo:*database* (or database pomo:*database*))) - (loop - for (name . value) in alist - for set = (format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value) - do - (log-message :debug set) - (pomo:execute set)))) - -(defmacro with-pgsql-transaction ((dbname &key database) &body forms) - "Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if - given. To get the connection spec from the DBNAME, use `get-connection-spec'." - (if database - `(let ((pomo:*database* database)) - (pomo:with-transaction () - (set-session-gucs *pg-settings* :transaction t) - (progn ,@forms))) - ;; no database given, create a new database connection - `(pomo:with-connection (get-connection-spec ,dbname) - (set-session-gucs *pg-settings*) - (pomo:with-transaction () - (progn ,@forms))))) - -(defun pgsql-execute (sql &key ((:client-min-messages level))) - "Execute given SQL in current transaction" - (when level - (pomo:execute - (format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name level)))) - - (pomo:execute sql) - - (when level (pomo:execute (format nil "RESET client_min_messages;")))) - -(defun truncate-table (dbname table-name) - "Truncate given TABLE-NAME in database DBNAME" - (pomo:with-connection (get-connection-spec dbname) - (pomo:execute (format nil "truncate ~a;" table-name)))) - -(defun list-databases (&optional (username "postgres")) - "Connect to a local database and get the database list" - (pomo:with-connection (let ((*pgconn-user* username)) - (get-connection-spec "postgres")) - (loop for (dbname) in (pomo:query - "select datname - from pg_database - where datname !~ 'postgres|template'") - collect dbname))) - -(defun list-tables (dbname) - "Return an alist of tables names and list of columns to pay attention to." - (pomo:with-connection - (get-connection-spec dbname) - - (loop for (relname colarray) in (pomo:query " -select relname, array_agg(case when typname in ('date', 'timestamptz') - then attnum end - order by attnum) - from pg_class c - join pg_namespace n on n.oid = c.relnamespace - left join pg_attribute a on c.oid = a.attrelid - join pg_type t on t.oid = a.atttypid - where c.relkind = 'r' - and attnum > 0 - and n.nspname = 'public' - group by relname -") - collect (cons relname (loop - for attnum across colarray - unless (eq attnum :NULL) - collect attnum))))) - -(defun list-tables-cols (dbname &key (schema "public") table-name-list) - "Return an alist of tables names and number of columns." - (pomo:with-connection - (get-connection-spec dbname) - - (loop for (relname cols) - in (pomo:query (format nil " - select relname, count(attnum) - from pg_class c - join pg_namespace n on n.oid = c.relnamespace - left join pg_attribute a on c.oid = a.attrelid - join pg_type t on t.oid = a.atttypid - where c.relkind = 'r' - and attnum > 0 - and n.nspname = '~a' - ~@[~{and relname = '~a'~^ ~}~] - group by relname -" schema table-name-list)) - collect (cons relname cols)))) - -(defun list-columns (dbname table-name &key schema) - "Return a list of column names for given TABLE-NAME." - (pomo:with-connection - (get-connection-spec dbname) - - (pomo:query (format nil " - select attname - from pg_class c - join pg_namespace n on n.oid = c.relnamespace - left join pg_attribute a on c.oid = a.attrelid - join pg_type t on t.oid = a.atttypid - where c.oid = '~:[~*~a~;~a.~a~]'::regclass and attnum > 0 - order by attnum" schema schema table-name) :column))) - -(defun list-reserved-keywords (dbname) - "Connect to PostgreSQL DBNAME and fetch reserved keywords." - (with-pgsql-transaction (dbname) - (pomo:query "select word from pg_get_keywords() where catcode = 'R'" :column))) - -(defun reset-all-sequences (dbname &key only-tables) - "Reset all sequences to the max value of the column they are attached to." - (pomo:with-connection (get-connection-spec dbname) - (pomo:execute "set client_min_messages to warning;") - (pomo:execute "listen seqs") - - (handler-case - (pomo:execute (format nil " -DO $$ -DECLARE - n integer := 0; - r record; -BEGIN - FOR r in - SELECT 'select ' - || trim(trailing ')' - from replace(pg_get_expr(d.adbin, d.adrelid), - 'nextval', 'setval')) - || ', (select greatest(max(' || a.attname || '), 1) from only ' - || nspname || '.' || relname || '));' as sql - FROM pg_class c - JOIN pg_namespace n on n.oid = c.relnamespace - JOIN pg_attribute a on a.attrelid = c.oid - JOIN pg_attrdef d on d.adrelid = a.attrelid - and d.adnum = a.attnum - and a.atthasdef - WHERE relkind = 'r' and a.attnum > 0 - and pg_get_expr(d.adbin, d.adrelid) ~~ '^nextval' - ~@[and c.oid in (~{'~a'::regclass~^, ~})~] - LOOP - n := n + 1; - EXECUTE r.sql; - END LOOP; - - PERFORM pg_notify('seqs', n::text); -END; -$$; " only-tables)) - ;; now get the notification signal - (cl-postgres:postgresql-notification (c) - (parse-integer (cl-postgres:postgresql-notification-payload c)))))) - ;;; ;;; PostgreSQL formating tools ;;; @@ -184,76 +21,6 @@ $$; " only-tables)) ;; force nil values to being cl-postgres :null special value collect (if (null transformed-col) :null transformed-col))) -;;; -;;; Format row to PostgreSQL COPY format, the TEXT variant. -;;; -(defun format-row (stream row &key transforms) - "Add a ROW in the STREAM, formating ROW in PostgreSQL COPY TEXT format. - -See http://www.postgresql.org/docs/9.2/static/sql-copy.html#AEN66609 for -details about the format, and format specs." - (let* (*print-circle* *print-pretty*) - (loop - for (col . more?) on row - for fn in transforms - for preprocessed-col = (apply-transform-function fn col) - ;; still accept postmodern :NULL in "preprocessed" data - do (if (or (null preprocessed-col) - (eq :NULL preprocessed-col)) - (format stream "~a~:[~;~c~]" "\\N" more? #\Tab) - (progn - ;; From PostgreSQL docs: - ;; - ;; In particular, the following characters must be preceded - ;; by a backslash if they appear as part of a column value: - ;; backslash itself, newline, carriage return, and the - ;; current delimiter character. - (loop - for char across preprocessed-col - do (case char - (#\\ (format stream "\\\\")) ; 2 chars here - (#\Space (princ #\Space stream)) - (#\Newline (format stream "\\n")) ; 2 chars here - (#\Return (format stream "\\r")) ; 2 chars here - (#\Tab (format stream "\\t")) ; 2 chars here - (#\Backspace (format stream "\\b")) ; 2 chars here - (#\Page (format stream "\\f")) ; 2 chars here - (t (format stream "~c" char)))) - (format stream "~:[~;~c~]" more? #\Tab)))) - (format stream "~%"))) - -;;; -;;; Read a file format in PostgreSQL COPY TEXT format, and call given -;;; function on each line. -;;; -(defun map-rows (filename &key process-row-fn) - "Load data from a text file in PostgreSQL COPY TEXT format. - -Each row is pre-processed then PROCESS-ROW-FN is called with the row as a -list as its only parameter. - -Finally returns how many rows where read and processed." - (with-open-file - ;; we just ignore files that don't exist - (input filename - :direction :input - :if-does-not-exist nil) - (when input - ;; read in the text file, split it into columns, process NULL columns - ;; the way postmodern expects them, and call PROCESS-ROW-FN on them - (loop - for line = (read-line input nil) - for row = (mapcar (lambda (x) - ;; we want Postmodern compliant NULLs - (if (string= "\\N" x) :null x)) - ;; splitting is easy, it's always on #\Tab - ;; see format-row-for-copy for details - (sq:split-sequence #\Tab line)) - while line - counting line into count - do (funcall process-row-fn row) - finally (return count))))) - ;;; ;;; Pop data from a lparallel.queue queue instance, reformat it assuming ;;; data in there are from cl-mysql, and copy it to a PostgreSQL table. @@ -340,59 +107,6 @@ Finally returns how many rows where read and processed." (pgstate-incf *state* table-name :rows rows) while cont))) -;;; -;;; Read a file in PostgreSQL COPY TEXT format and load it into a PostgreSQL -;;; table using the COPY protocol. We expect PostgreSQL compatible data in -;;; that data format, so we don't handle any reformating here. -;;; -(defun copy-to-queue (table-name filename dataq &optional (*state* *state*)) - "Copy data from file FILENAME into lparallel.queue DATAQ" - (let ((read - (pgloader.queue:map-push-queue dataq #'map-rows filename))) - (pgstate-incf *state* table-name :read read))) - -(defun copy-from-file (dbname table-name filename - &key - (truncate t) - (report nil)) - "Load data from clean COPY TEXT file to PostgreSQL, return how many rows." - (let* ((*state* (if report (pgloader.utils:make-pgstate) *state*)) - (lp:*kernel* - (lp:make-kernel 2 :bindings - `((*pgconn-host* . ,*pgconn-host*) - (*pgconn-port* . ,*pgconn-port*) - (*pgconn-user* . ,*pgconn-user*) - (*pgconn-pass* . ,*pgconn-pass*) - (*pg-settings* . ',*pg-settings*) - (*state* . ,*state*)))) - (channel (lp:make-channel)) - (dataq (lq:make-queue :fixed-capacity 4096))) - - (log-message :debug "pgsql:copy-from-file: ~a ~a ~a" dbname table-name filename) - - (when report - (pgstate-add-table *state* dbname table-name)) - - (lp:submit-task channel #'copy-to-queue table-name filename dataq *state*) - - ;; and start another task to push that data from the queue to PostgreSQL - (lp:submit-task channel - #'pgloader.pgsql:copy-from-queue - dbname table-name dataq - :state *state* - :truncate truncate) - - ;; now wait until both the tasks are over, and measure time it took'em - (multiple-value-bind (res secs) - (timing - (loop for tasks below 2 do (lp:receive-result channel))) - (declare (ignore res)) - (when report (pgstate-incf *state* table-name :secs secs))) - - (when report - (report-table-name table-name) - (report-pgtable-stats *state* table-name)))) - ;;; ;;; When a batch has been refused by PostgreSQL with a data-exception, that ;;; means it contains non-conforming data. It could be only one row in the