Refactor transaction handling, depend on a patch to postmodern.

This commit is contained in:
Dimitri Fontaine 2013-09-23 11:30:20 +02:00
parent fd2809277f
commit 7151a2ea62
5 changed files with 108 additions and 65 deletions

View File

@ -121,16 +121,18 @@
create-table
truncate)
"Open the DB3 and stream its content to a PostgreSQL database."
(when create-table
(let ((create-table-sql (db3-create-table filename)))
(log-message :notice "Create table \"~a\"" table-name)
(log-message :info "~a" create-table-sql)
(pgloader.pgsql:execute dbname create-table-sql)))
(with-pgsql-transaction (dbname)
(when create-table
(let ((create-table-sql (db3-create-table filename)))
(log-message :notice "Create table \"~a\"" table-name)
(log-message :info "~a" create-table-sql)
(pgsql-execute create-table-sql)))
(when truncate
(let ((truncate-sql (format nil "TRUNCATE ~a;" table-name)))
(log-message :notice "~a" truncate-sql)
(pgloader.pgsql:execute dbname truncate-sql)))
(when (and truncate (not create-table))
;; we don't TRUNCATE a table we just CREATEd
(let ((truncate-sql (format nil "TRUNCATE ~a;" table-name)))
(log-message :notice "~a" truncate-sql)
(pgsql-execute truncate-sql))))
(let* ((*state* (make-pgstate))
(lp:*kernel* (make-kernel 2))

View File

@ -120,10 +120,9 @@ order by table_name, ordinal_position" dbname)))
collect (get-create-table table-name cols
:identifier-case identifier-case)))
(defun pgsql-create-tables (dbname all-columns
(defun pgsql-create-tables (all-columns
&key
(identifier-case :downcase)
(pg-dbname dbname)
include-drop)
"Create all MySQL tables in database dbname in PostgreSQL"
(loop
@ -131,7 +130,7 @@ order by table_name, ordinal_position" dbname)))
for sql in (get-pgsql-create-tables all-columns
:identifier-case identifier-case
:include-drop include-drop)
do (pgloader.pgsql:execute pg-dbname sql :client-min-messages :warning)
do (pgsql-execute sql :client-min-messages :warning)
finally (return nb-tables)))
@ -454,7 +453,7 @@ order by ordinal_position" dbname table-name)))
(defun execute-with-timing (dbname label sql state &key (count 1))
"Execute given SQL and resgister its timing into STATE."
(multiple-value-bind (res secs)
(timing (pgloader.pgsql:execute dbname sql))
(timing (with-pgsql-transaction (dbname) (pgsql-execute sql)))
(declare (ignore res))
(pgstate-incf state label :rows count :secs secs)))
@ -477,7 +476,8 @@ order by ordinal_position" dbname table-name)))
for sql in drop-indexes
do
(log-message :notice "~a" sql)
(lp:submit-task drop-channel #'pgloader.pgsql:execute dbname sql))
(lp:submit-task drop-channel
#'execute-with-timing dbname label sql state))
;; wait for the DROP INDEX to be done before issuing CREATE INDEX
(loop for idx in drop-indexes do (lp:receive-result drop-channel))))
@ -517,10 +517,10 @@ order by ordinal_position" dbname table-name)))
;; if asked, first drop/create the tables on the PostgreSQL side
(when create-tables
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
(pgsql-create-tables dbname all-columns
:identifier-case identifier-case
:pg-dbname pg-dbname
:include-drop include-drop))
(with-pgsql-transaction (dbname)
(pgsql-create-tables all-columns
:identifier-case identifier-case
:include-drop include-drop)))
(loop
for (table-name . columns) in all-columns

View File

@ -50,6 +50,25 @@
(:export #:map-pop-queue
#:map-push-queue))
(defpackage #:pgloader.pgsql
(:use #:cl #:pgloader.params #:pgloader.utils)
(:export #:with-pgsql-transaction
#:pgsql-execute
#:truncate-table
#:copy-from-file
#:copy-from-queue
#:list-databases
#:list-tables
#:list-tables-cols
#:reset-all-sequences
#:execute
#:get-date-columns
#:format-row))
;;
;; Specific source handling
;;
(defpackage #:pgloader.csv
(:use #:cl #:pgloader.params #:pgloader.utils)
(:export #:*csv-path-root*
@ -62,6 +81,9 @@
(defpackage #:pgloader.db3
(:use #:cl #:pgloader.params #:pgloader.utils)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute)
(:export #:map-rows
#:copy-to
#:copy-to-queue
@ -69,16 +91,25 @@
(defpackage #:pgloader.archive
(:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.csv)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute)
(:export #:import-csv-from-zip))
(defpackage #:pgloader.syslog
(:use #:cl #:pgloader.params #:pgloader.utils)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute)
(:export #:stream-messages
#:start-syslog-server
#:send-message))
(defpackage #:pgloader.mysql
(:use #:cl #:pgloader.params #:pgloader.utils)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute)
(:export #:*cast-rules*
#:*default-cast-rules*
#:map-rows
@ -90,19 +121,10 @@
#:stream-table
#:stream-database))
(defpackage #:pgloader.pgsql
(:use #:cl #:pgloader.params #:pgloader.utils)
(:export #:truncate-table
#:copy-from-file
#:copy-from-queue
#:list-databases
#:list-tables
#:list-tables-cols
#:reset-all-sequences
#:execute
#:get-date-columns
#:format-row))
;;
;; Main package
;;
(defpackage #:pgloader
(:use #:cl #:pgloader.params #:pgloader.utils)
(:import-from #:pgloader.pgsql

View File

@ -32,10 +32,6 @@
(:file "parser" :depends-on ("package" "params"))
(:file "transforms")
(:file "queue" :depends-on ("package")) ; pgloader.queue
(:file "csv" :depends-on ("package")) ; pgloader.csv
(:file "db3" :depends-on ("package")) ; pgloader.db3
(:file "archive" :depends-on ("package")) ; pgloader.archive
(:file "syslog" :depends-on ("package")) ; pgloader.syslog
;; package pgloader.pgsql
(:file "pgsql" :depends-on ("package"
@ -43,6 +39,12 @@
"utils"
"transforms"))
;; Source format specific implementations
(:file "csv" :depends-on ("package" "pgsql"))
(:file "db3" :depends-on ("package" "pgsql"))
(:file "archive" :depends-on ("package" "pgsql"))
(:file "syslog" :depends-on ("package" "pgsql"))
;; mysql.lisp depends on pgsql.lisp to be able to export data
;; from MySQL in the PostgreSQL format.
;;

View File

@ -14,6 +14,38 @@
(append conspec (list :port *pgconn-port*))
(append conspec (list *pgconn-port*)))))
(defun set-session-gucs (alist &key transaction database)
"Set given GUCs to given values for the current session."
(let ((pomo:*database* (or database pomo:*database*)))
(loop
for (name . value) in alist
do (pomo:execute
(format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value)))))
(defmacro with-pgsql-transaction ((dbname &key database) &body forms)
"Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if
given. To get the connection spec from the DBNAME, use `get-connection-spec'."
(if database
`(let ((pomo:*database* database))
(pomo:with-transaction ()
(set-session-gucs *pg-settings* :transaction t)
(progn ,@forms)))
;; no database given, create a new database connection
`(pomo:with-connection (get-connection-spec ,dbname)
(set-session-gucs *pg-settings*)
(pomo:with-transaction ()
(progn ,@forms)))))
(defun pgsql-execute (sql &key ((:client-min-messages level)))
"Execute given SQL in current transaction"
(when level
(pomo:execute
(format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name level))))
(pomo:execute sql)
(when level (pomo:execute (format nil "RESET client_min_messages;"))))
(defun truncate-table (dbname table-name)
"Truncate given TABLE-NAME in database DBNAME"
(pomo:with-connection (get-connection-spec dbname)
@ -72,16 +104,12 @@ select relname, array_agg(case when typname in ('date', 'timestamptz')
(defun reset-all-sequences (dbname)
"Reset all sequences to the max value of the column they are attached to."
(let ((connection
(apply #'cl-postgres:open-database
(remove :port (get-connection-spec dbname)))))
(pomo:with-connection (get-connection-spec dbname)
(pomo:execute "set client_min_messages to warning;")
(pomo:execute "listen seqs")
(cl-postgres:exec-query connection "set client_min_messages to warning;")
(cl-postgres:exec-query connection "listen seqs")
(prog1
(handler-case
(cl-postgres:exec-query connection "
(handler-case
(pomo:execute "
DO $$
DECLARE
n integer := 0;
@ -110,24 +138,9 @@ BEGIN
PERFORM pg_notify('seqs', n::text);
END;
$$; ")
;; now get the notification signal
(cl-postgres:postgresql-notification (c)
(parse-integer (cl-postgres:postgresql-notification-payload c))))
(cl-postgres:close-database connection))))
(defun execute (dbname sql &key ((:client-min-messages level)))
"Execute given SQL in DBNAME"
(pomo:with-connection (get-connection-spec dbname)
(when level
(pomo:execute
(format nil "SET client_min_messages TO ~a;" (symbol-name level))))
(pomo:execute sql)))
(defun set-session-gucs (alist)
"Set given GUCs to given values for the current session."
(loop
for (name . value) in alist
do (pomo:execute (format nil "SET ~a TO ~a" name value))))
;; now get the notification signal
(cl-postgres:postgresql-notification (c)
(parse-integer (cl-postgres:postgresql-notification-payload c))))))
;;;
;;; PostgreSQL formating tools
@ -267,20 +280,24 @@ Finally returns how many rows where read and processed."
(let* ((conspec (get-connection-spec dbname :with-port nil)))
(loop
for retval =
(let* ((stream (cl-postgres:open-db-writer conspec table-name nil))
(let* ((copier (cl-postgres:open-db-writer conspec table-name nil
:initialize nil))
(*batch* nil)
(*batch-size* 0))
(log-message :debug "pgsql:copy-from-queue: starting new batch")
(set-session-gucs *pg-settings*
:database (cl-postgres::copier-database copier))
(cl-postgres::initialize-copier copier)
(unwind-protect
(let ((process-row-fn
(make-copy-and-batch-fn stream :transforms transforms)))
(make-copy-and-batch-fn copier :transforms transforms)))
(catch 'next-batch
(pgloader.queue:map-pop-queue dataq process-row-fn)))
;; in case of data-exception, split the batch and try again
(handler-case
(progn
(log-message :debug "pgsql:copy-from-queue: commit batch")
(cl-postgres:close-db-writer stream))
(cl-postgres:close-db-writer copier))
((or
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
CL-POSTGRES-ERROR:DATA-EXCEPTION) (e)