diff --git a/db3.lisp b/db3.lisp index 18e3082..8f3d4bf 100644 --- a/db3.lisp +++ b/db3.lisp @@ -121,16 +121,18 @@ create-table truncate) "Open the DB3 and stream its content to a PostgreSQL database." - (when create-table - (let ((create-table-sql (db3-create-table filename))) - (log-message :notice "Create table \"~a\"" table-name) - (log-message :info "~a" create-table-sql) - (pgloader.pgsql:execute dbname create-table-sql))) + (with-pgsql-transaction (dbname) + (when create-table + (let ((create-table-sql (db3-create-table filename))) + (log-message :notice "Create table \"~a\"" table-name) + (log-message :info "~a" create-table-sql) + (pgsql-execute create-table-sql))) - (when truncate - (let ((truncate-sql (format nil "TRUNCATE ~a;" table-name))) - (log-message :notice "~a" truncate-sql) - (pgloader.pgsql:execute dbname truncate-sql))) + (when (and truncate (not create-table)) + ;; we don't TRUNCATE a table we just CREATEd + (let ((truncate-sql (format nil "TRUNCATE ~a;" table-name))) + (log-message :notice "~a" truncate-sql) + (pgsql-execute truncate-sql)))) (let* ((*state* (make-pgstate)) (lp:*kernel* (make-kernel 2)) diff --git a/mysql.lisp b/mysql.lisp index c48b5be..c0bd242 100644 --- a/mysql.lisp +++ b/mysql.lisp @@ -120,10 +120,9 @@ order by table_name, ordinal_position" dbname))) collect (get-create-table table-name cols :identifier-case identifier-case))) -(defun pgsql-create-tables (dbname all-columns +(defun pgsql-create-tables (all-columns &key (identifier-case :downcase) - (pg-dbname dbname) include-drop) "Create all MySQL tables in database dbname in PostgreSQL" (loop @@ -131,7 +130,7 @@ order by table_name, ordinal_position" dbname))) for sql in (get-pgsql-create-tables all-columns :identifier-case identifier-case :include-drop include-drop) - do (pgloader.pgsql:execute pg-dbname sql :client-min-messages :warning) + do (pgsql-execute sql :client-min-messages :warning) finally (return nb-tables))) @@ -454,7 +453,7 @@ order by ordinal_position" dbname table-name))) (defun execute-with-timing (dbname label sql state &key (count 1)) "Execute given SQL and resgister its timing into STATE." (multiple-value-bind (res secs) - (timing (pgloader.pgsql:execute dbname sql)) + (timing (with-pgsql-transaction (dbname) (pgsql-execute sql))) (declare (ignore res)) (pgstate-incf state label :rows count :secs secs))) @@ -477,7 +476,8 @@ order by ordinal_position" dbname table-name))) for sql in drop-indexes do (log-message :notice "~a" sql) - (lp:submit-task drop-channel #'pgloader.pgsql:execute dbname sql)) + (lp:submit-task drop-channel + #'execute-with-timing dbname label sql state)) ;; wait for the DROP INDEX to be done before issuing CREATE INDEX (loop for idx in drop-indexes do (lp:receive-result drop-channel)))) @@ -517,10 +517,10 @@ order by ordinal_position" dbname table-name))) ;; if asked, first drop/create the tables on the PostgreSQL side (when create-tables (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) - (pgsql-create-tables dbname all-columns - :identifier-case identifier-case - :pg-dbname pg-dbname - :include-drop include-drop)) + (with-pgsql-transaction (dbname) + (pgsql-create-tables all-columns + :identifier-case identifier-case + :include-drop include-drop))) (loop for (table-name . columns) in all-columns diff --git a/package.lisp b/package.lisp index b8f1e62..c7acd09 100644 --- a/package.lisp +++ b/package.lisp @@ -50,6 +50,25 @@ (:export #:map-pop-queue #:map-push-queue)) +(defpackage #:pgloader.pgsql + (:use #:cl #:pgloader.params #:pgloader.utils) + (:export #:with-pgsql-transaction + #:pgsql-execute + #:truncate-table + #:copy-from-file + #:copy-from-queue + #:list-databases + #:list-tables + #:list-tables-cols + #:reset-all-sequences + #:execute + #:get-date-columns + #:format-row)) + + +;; +;; Specific source handling +;; (defpackage #:pgloader.csv (:use #:cl #:pgloader.params #:pgloader.utils) (:export #:*csv-path-root* @@ -62,6 +81,9 @@ (defpackage #:pgloader.db3 (:use #:cl #:pgloader.params #:pgloader.utils) + (:import-from #:pgloader.pgsql + #:with-pgsql-transaction + #:pgsql-execute) (:export #:map-rows #:copy-to #:copy-to-queue @@ -69,16 +91,25 @@ (defpackage #:pgloader.archive (:use #:cl #:pgloader.params #:pgloader.utils #:pgloader.csv) + (:import-from #:pgloader.pgsql + #:with-pgsql-transaction + #:pgsql-execute) (:export #:import-csv-from-zip)) (defpackage #:pgloader.syslog (:use #:cl #:pgloader.params #:pgloader.utils) + (:import-from #:pgloader.pgsql + #:with-pgsql-transaction + #:pgsql-execute) (:export #:stream-messages #:start-syslog-server #:send-message)) (defpackage #:pgloader.mysql (:use #:cl #:pgloader.params #:pgloader.utils) + (:import-from #:pgloader.pgsql + #:with-pgsql-transaction + #:pgsql-execute) (:export #:*cast-rules* #:*default-cast-rules* #:map-rows @@ -90,19 +121,10 @@ #:stream-table #:stream-database)) -(defpackage #:pgloader.pgsql - (:use #:cl #:pgloader.params #:pgloader.utils) - (:export #:truncate-table - #:copy-from-file - #:copy-from-queue - #:list-databases - #:list-tables - #:list-tables-cols - #:reset-all-sequences - #:execute - #:get-date-columns - #:format-row)) - + +;; +;; Main package +;; (defpackage #:pgloader (:use #:cl #:pgloader.params #:pgloader.utils) (:import-from #:pgloader.pgsql diff --git a/pgloader.asd b/pgloader.asd index 473f0b1..9e8b6bb 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -32,10 +32,6 @@ (:file "parser" :depends-on ("package" "params")) (:file "transforms") (:file "queue" :depends-on ("package")) ; pgloader.queue - (:file "csv" :depends-on ("package")) ; pgloader.csv - (:file "db3" :depends-on ("package")) ; pgloader.db3 - (:file "archive" :depends-on ("package")) ; pgloader.archive - (:file "syslog" :depends-on ("package")) ; pgloader.syslog ;; package pgloader.pgsql (:file "pgsql" :depends-on ("package" @@ -43,6 +39,12 @@ "utils" "transforms")) + ;; Source format specific implementations + (:file "csv" :depends-on ("package" "pgsql")) + (:file "db3" :depends-on ("package" "pgsql")) + (:file "archive" :depends-on ("package" "pgsql")) + (:file "syslog" :depends-on ("package" "pgsql")) + ;; mysql.lisp depends on pgsql.lisp to be able to export data ;; from MySQL in the PostgreSQL format. ;; diff --git a/pgsql.lisp b/pgsql.lisp index e3ace33..c6f2f21 100644 --- a/pgsql.lisp +++ b/pgsql.lisp @@ -14,6 +14,38 @@ (append conspec (list :port *pgconn-port*)) (append conspec (list *pgconn-port*))))) +(defun set-session-gucs (alist &key transaction database) + "Set given GUCs to given values for the current session." + (let ((pomo:*database* (or database pomo:*database*))) + (loop + for (name . value) in alist + do (pomo:execute + (format nil "SET~@[ LOCAL~] ~a TO '~a'" transaction name value))))) + +(defmacro with-pgsql-transaction ((dbname &key database) &body forms) + "Run FORMS within a PostgreSQL transaction to DBNAME, reusing DATABASE if + given. To get the connection spec from the DBNAME, use `get-connection-spec'." + (if database + `(let ((pomo:*database* database)) + (pomo:with-transaction () + (set-session-gucs *pg-settings* :transaction t) + (progn ,@forms))) + ;; no database given, create a new database connection + `(pomo:with-connection (get-connection-spec ,dbname) + (set-session-gucs *pg-settings*) + (pomo:with-transaction () + (progn ,@forms))))) + +(defun pgsql-execute (sql &key ((:client-min-messages level))) + "Execute given SQL in current transaction" + (when level + (pomo:execute + (format nil "SET LOCAL client_min_messages TO ~a;" (symbol-name level)))) + + (pomo:execute sql) + + (when level (pomo:execute (format nil "RESET client_min_messages;")))) + (defun truncate-table (dbname table-name) "Truncate given TABLE-NAME in database DBNAME" (pomo:with-connection (get-connection-spec dbname) @@ -72,16 +104,12 @@ select relname, array_agg(case when typname in ('date', 'timestamptz') (defun reset-all-sequences (dbname) "Reset all sequences to the max value of the column they are attached to." - (let ((connection - (apply #'cl-postgres:open-database - (remove :port (get-connection-spec dbname))))) + (pomo:with-connection (get-connection-spec dbname) + (pomo:execute "set client_min_messages to warning;") + (pomo:execute "listen seqs") - (cl-postgres:exec-query connection "set client_min_messages to warning;") - (cl-postgres:exec-query connection "listen seqs") - - (prog1 - (handler-case - (cl-postgres:exec-query connection " + (handler-case + (pomo:execute " DO $$ DECLARE n integer := 0; @@ -110,24 +138,9 @@ BEGIN PERFORM pg_notify('seqs', n::text); END; $$; ") - ;; now get the notification signal - (cl-postgres:postgresql-notification (c) - (parse-integer (cl-postgres:postgresql-notification-payload c)))) - (cl-postgres:close-database connection)))) - -(defun execute (dbname sql &key ((:client-min-messages level))) - "Execute given SQL in DBNAME" - (pomo:with-connection (get-connection-spec dbname) - (when level - (pomo:execute - (format nil "SET client_min_messages TO ~a;" (symbol-name level)))) - (pomo:execute sql))) - -(defun set-session-gucs (alist) - "Set given GUCs to given values for the current session." - (loop - for (name . value) in alist - do (pomo:execute (format nil "SET ~a TO ~a" name value)))) + ;; now get the notification signal + (cl-postgres:postgresql-notification (c) + (parse-integer (cl-postgres:postgresql-notification-payload c)))))) ;;; ;;; PostgreSQL formating tools @@ -267,20 +280,24 @@ Finally returns how many rows where read and processed." (let* ((conspec (get-connection-spec dbname :with-port nil))) (loop for retval = - (let* ((stream (cl-postgres:open-db-writer conspec table-name nil)) + (let* ((copier (cl-postgres:open-db-writer conspec table-name nil + :initialize nil)) (*batch* nil) (*batch-size* 0)) (log-message :debug "pgsql:copy-from-queue: starting new batch") + (set-session-gucs *pg-settings* + :database (cl-postgres::copier-database copier)) + (cl-postgres::initialize-copier copier) (unwind-protect (let ((process-row-fn - (make-copy-and-batch-fn stream :transforms transforms))) + (make-copy-and-batch-fn copier :transforms transforms))) (catch 'next-batch (pgloader.queue:map-pop-queue dataq process-row-fn))) ;; in case of data-exception, split the batch and try again (handler-case (progn (log-message :debug "pgsql:copy-from-queue: commit batch") - (cl-postgres:close-db-writer stream)) + (cl-postgres:close-db-writer copier)) ((or CL-POSTGRES-ERROR:UNIQUE-VIOLATION CL-POSTGRES-ERROR:DATA-EXCEPTION) (e)