diff --git a/src/connection.lisp b/src/connection.lisp index 3ee2c21..ecc199e 100644 --- a/src/connection.lisp +++ b/src/connection.lisp @@ -3,6 +3,9 @@ ;; (in-package :pgloader.connection) +;;; +;;; Generic API +;;; (defclass connection () ((type :initarg :type :accessor conn-type) (handle :initarg :conn :accessor conn-handle :initform nil)) @@ -21,12 +24,26 @@ (defgeneric check-connection (connection) (:documentation "Check that we can actually connect.")) +(defgeneric clone-connection (connection) + (:documentation "Instanciate a new connection object with similar properties.")) + + +;;; +;;; File based objects +;;; (defclass fd-connection (connection) ((uri :initarg :uri :accessor fd-uri) (arch :initarg :arch :accessor fd-arch) (path :initarg :path :accessor fd-path)) (:documentation "pgloader connection parameters for a file based data source.")) +(defmethod clone-connection ((fd fd-connection)) + (let ((clone (make-instance 'fd-connection :type (conn-type fd)))) + (loop :for slot :in '(uri arch path) + :do (when (slot-boundp fd slot) + (setf (slot-value clone slot) (slot-value fd slot)))) + clone)) + (define-condition fd-connection-error (connection-error) ((path :initarg :path :reader connection-error-path)) (:report (lambda (err stream) @@ -72,6 +89,9 @@ (setf (fd-path fd) local-filename)))) fd) +;;; +;;; database connections +;;; (defclass db-connection (connection) ((name :initarg :name :accessor db-name) (host :initarg :host :accessor db-host) @@ -80,6 +100,15 @@ (pass :initarg :pass :accessor db-pass)) (:documentation "pgloader connection parameters for a database service.")) +(defmethod clone-connection ((c db-connection)) + (make-instance 'db-connection + :type (conn-type c) + :name (db-name c) + :host (db-host c) + :port (db-port c) + :user (db-user c) + :pass (db-pass c))) + (defmethod print-object ((c db-connection) stream) (print-unreadable-object (c stream :type t :identity t) (with-slots (type name host port user) c @@ -100,6 +129,10 @@ (defgeneric query (db-connection sql &key) (:documentation "Query DB-CONNECTION with SQL query")) + +;;; +;;; Tools for every connection classes +;;; (defmacro with-connection ((var connection) &body forms) "Connect to DB-CONNECTION and handle any condition when doing so, and when connected execute FORMS in a protected way so that we always disconnect diff --git a/src/package.lisp b/src/package.lisp index e1837a1..d4a15af 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -137,6 +137,7 @@ (:export #:connection #:open-connection #:close-connection + #:clone-connection #:fd-connection #:db-connection #:connection-error @@ -180,7 +181,6 @@ (:export #:pgsql-connection #:pgconn-use-ssl #:pgconn-table-name - #:new-pgsql-connection #:with-pgsql-transaction #:with-pgsql-connection #:pgsql-execute @@ -409,7 +409,6 @@ #:pgloader.sources #:pgloader.queue) (:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.pgsql - #:new-pgsql-connection #:with-pgsql-connection #:with-pgsql-transaction #:pgsql-execute @@ -479,7 +478,6 @@ #:pgloader.sources #:pgloader.queue) (:import-from #:pgloader.transforms #:precision #:scale) (:import-from #:pgloader.pgsql - #:new-pgsql-connection #:with-pgsql-connection #:with-pgsql-transaction #:pgsql-execute diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 10484cc..f1475e8 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -72,7 +72,9 @@ unqualified-table-name rows oversized?) (update-stats :data table-name :rows rows :ws ws))) - (when disable-triggers (enable-triggers unqualified-table-name))))) + (when disable-triggers (enable-triggers unqualified-table-name)))) + + (cons :target table-name)) ;;; ;;; Compute how many rows we're going to try loading next, depending on diff --git a/src/pgsql/queries.lisp b/src/pgsql/queries.lisp index 3758a36..a5e7407 100644 --- a/src/pgsql/queries.lisp +++ b/src/pgsql/queries.lisp @@ -15,6 +15,13 @@ "Assign the type slot to pgsql." (setf (slot-value pgconn 'type) "pgsql")) +(defmethod clone-connection ((c pgsql-connection)) + (let ((clone + (change-class (call-next-method c) 'pgsql-connection))) + (setf (pgconn-use-ssl clone) (pgconn-use-ssl c) + (pgconn-table-name clone) (pgconn-table-name c)) + clone)) + (defun new-pgsql-connection (pgconn) "Prepare a new connection object with all the same properties as pgconn, so as to avoid stepping on it's handle" @@ -41,6 +48,7 @@ (defmethod close-connection ((pgconn pgsql-connection)) "Close a PostgreSQL connection." + (assert (not (null (conn-handle pgconn)))) (pomo:disconnect (conn-handle pgconn)) (setf (conn-handle pgconn) nil) pgconn) @@ -81,13 +89,14 @@ (defmacro with-pgsql-connection ((pgconn) &body forms) "Run FROMS within a PostgreSQL connection to DBNAME. To get the connection spec from the DBNAME, use `get-connection-spec'." - `(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn))) - (with-connection (conn ,pgconn) - (let ((pomo:*database* (conn-handle conn))) - (log-message :debug "CONNECTED TO ~s" conn) - (set-session-gucs *pg-settings*) - (handling-pgsql-notices - ,@forms))))) + (let ((conn (gensym "pgsql-conn"))) + `(let (#+unix (cl-postgres::*unix-socket-dir* (get-unix-socket-dir ,pgconn))) + (with-connection (,conn ,pgconn) + (let ((pomo:*database* (conn-handle ,conn))) + (log-message :debug "CONNECTED TO ~s" ,conn) + (set-session-gucs *pg-settings*) + (handling-pgsql-notices + ,@forms)))))) (defun get-unix-socket-dir (pgconn) "When *pgconn* host is a (cons :unix path) value, return the right value @@ -344,7 +353,7 @@ select i.relname, (defun reset-all-sequences (pgconn &key tables) "Reset all sequences to the max value of the column they are attached to." - (let ((newconn (new-pgsql-connection pgconn))) + (let ((newconn (clone-connection pgconn))) (with-pgsql-connection (newconn) (set-session-gucs *pg-settings*) (pomo:execute "set client_min_messages to warning;") diff --git a/src/pgsql/schema.lisp b/src/pgsql/schema.lisp index dd20849..155d312 100644 --- a/src/pgsql/schema.lisp +++ b/src/pgsql/schema.lisp @@ -356,7 +356,7 @@ (lp:submit-task channel #'pgsql-connect-and-execute-with-timing ;; each thread must have its own connection - (new-pgsql-connection pgconn) + (clone-connection pgconn) :post label sql) ;; return the pkey "upgrade" statement diff --git a/src/sources/common/methods.lisp b/src/sources/common/methods.lisp index ed0929e..5e4c2b4 100644 --- a/src/sources/common/methods.lisp +++ b/src/sources/common/methods.lisp @@ -8,8 +8,8 @@ ;;; (defmethod copy-to-queue ((copy copy) queue) "Copy data from given COPY definition into lparallel.queue QUEUE" - (pgloader.queue:map-push-queue copy queue)) - + (pgloader.queue:map-push-queue copy queue) + (cons :source (target copy))) (defmethod copy-column-list ((copy copy)) "Default column list is an empty list." @@ -28,17 +28,18 @@ (defmethod copy-from ((copy copy) &key (kernel nil k-s-p) + (channel nil c-s-p) truncate disable-triggers) "Copy data from COPY source into PostgreSQL." (let* ((lp:*kernel* (or kernel (make-kernel 2))) - (channel (lp:make-channel)) + (channel (or channel (lp:make-channel))) (queue (lq:make-queue :fixed-capacity *concurrent-batches*)) (table-name (format-table-name (target copy)))) (with-stats-collection ((target copy) :dbname (db-name (target-db copy))) - (lp:task-handler-bind ((error #'lp:invoke-transfer-error)) - (log-message :notice "COPY ~s" table-name) + (lp:task-handler-bind () ;; ((error #'lp:invoke-transfer-error)) + (log-message :info "COPY ~s" table-name) ;; start a tast to read data from the source into the queue (lp:submit-task channel #'copy-to-queue copy queue) @@ -55,7 +56,8 @@ :disable-triggers disable-triggers) ;; now wait until both the tasks are over, and kill the kernel - (loop :for tasks :below 2 :do (lp:receive-result channel) - :finally - (log-message :info "COPY ~s done." table-name) - (unless k-s-p (lp:end-kernel))))))) + (unless c-s-p + (loop :for tasks :below 2 :do (lp:receive-result channel) + :finally + (log-message :info "COPY ~s done." table-name) + (unless k-s-p (lp:end-kernel)))))))) diff --git a/src/sources/mssql/mssql-schema.lisp b/src/sources/mssql/mssql-schema.lisp index 3959a92..8059b84 100644 --- a/src/sources/mssql/mssql-schema.lisp +++ b/src/sources/mssql/mssql-schema.lisp @@ -29,6 +29,9 @@ (setf (conn-handle msconn) nil) msconn) +(defmethod clone-connection ((c mssql-connection)) + (change-class (call-next-method c) 'mssql-connection)) + (defmethod query ((msconn mssql-connection) sql &key) "Send SQL query to MSCONN connection." (mssql:query sql :connection (conn-handle msconn))) diff --git a/src/sources/mssql/mssql.lisp b/src/sources/mssql/mssql.lisp index b22afda..85587b4 100644 --- a/src/sources/mssql/mssql.lisp +++ b/src/sources/mssql/mssql.lisp @@ -140,7 +140,9 @@ excluding) "Stream the given MS SQL database down to PostgreSQL." (let* ((cffi:*default-foreign-encoding* encoding) - (copy-kernel (make-kernel 2)) + (copy-kernel (make-kernel 2)) + (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) + (table-count 0) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes all-fkeys pkeys) @@ -215,8 +217,8 @@ :do (let ((table-source (make-instance 'copy-mssql - :source-db (source-db mssql) - :target-db (target-db mssql) + :source-db (clone-connection (source-db mssql)) + :target-db (clone-connection (target-db mssql)) :source (cons schema table-name) :target (qualify-name schema table-name) :fields columns))) @@ -225,8 +227,10 @@ ;; COPY the data to PostgreSQL, using copy-kernel (unless schema-only + (incf table-count) (copy-from table-source :kernel copy-kernel + :channel copy-channel :disable-triggers disable-triggers)) ;; Create the indexes for that table in parallel with the next @@ -250,7 +254,15 @@ idx-channel))))))) ;; now end the kernels - (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post) + (loop :for tasks :below (* 2 table-count) + :do (destructuring-bind (task . table-name) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s" + task table-name))) + (lp:end-kernel))) + (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. @@ -262,7 +274,7 @@ ;; ;; Complete the PostgreSQL database before handing over. ;; - (complete-pgsql-database (new-pgsql-connection (target-db mssql)) + (complete-pgsql-database (clone-connection (target-db mssql)) all-columns all-fkeys pkeys :data-only data-only :foreign-keys foreign-keys diff --git a/src/sources/mysql/mysql-schema.lisp b/src/sources/mysql/mysql-schema.lisp index 3e0264a..e009ec3 100644 --- a/src/sources/mysql/mysql-schema.lisp +++ b/src/sources/mysql/mysql-schema.lisp @@ -73,6 +73,9 @@ (setf (conn-handle myconn) nil) myconn) +(defmethod clone-connection ((c mysql-connection)) + (change-class (call-next-method c) 'mysql-connection)) + (defmethod query ((myconn mysql-connection) sql &key diff --git a/src/sources/mysql/mysql.lisp b/src/sources/mysql/mysql.lisp index 7a33d73..16747ee 100644 --- a/src/sources/mysql/mysql.lisp +++ b/src/sources/mysql/mysql.lisp @@ -295,7 +295,9 @@ decoding-as materialize-views) "Export MySQL data and Import it into PostgreSQL" - (let* ((copy-kernel (make-kernel 2)) + (let* ((copy-kernel (make-kernel 2)) + (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) + (table-count 0) idx-kernel idx-channel) (destructuring-bind (&key view-columns all-columns @@ -352,13 +354,13 @@ (return-from copy-database))) (loop - for (table-name . columns) in (append all-columns view-columns) + :for (table-name . columns) :in (append all-columns view-columns) - unless columns - do (log-message :error "Table ~s not found, skipping." table-name) + :unless columns + :do (log-message :error "Table ~s not found, skipping." table-name) - when columns - do + :when columns + :do (let* ((encoding ;; force the data encoding when asked to (when decoding-as @@ -368,8 +370,8 @@ (table-source (make-instance 'copy-mysql - :source-db (source-db mysql) - :target-db (target-db mysql) + :source-db (clone-connection (source-db mysql)) + :target-db (clone-connection (target-db mysql)) :source table-name :target (apply-identifier-case table-name) :fields columns @@ -379,8 +381,10 @@ ;; first COPY the data from MySQL to PostgreSQL, using copy-kernel (unless schema-only + (incf table-count) (copy-from table-source :kernel copy-kernel + :channel copy-channel :disable-triggers disable-triggers)) ;; Create the indexes for that table in parallel with the next @@ -401,7 +405,15 @@ indexes idx-kernel idx-channel)))))) ;; now end the kernels - (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post) + (loop :for tasks :below (* 2 table-count) + :do (destructuring-bind (task . table-name) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s" + task table-name))) + (lp:end-kernel))) + (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. @@ -420,7 +432,7 @@ ;; ;; Complete the PostgreSQL database before handing over. ;; - (complete-pgsql-database (new-pgsql-connection (target-db mysql)) + (complete-pgsql-database (clone-connection (target-db mysql)) all-columns all-fkeys pkeys table-comments column-comments :data-only data-only diff --git a/src/sources/sqlite/sqlite.lisp b/src/sources/sqlite/sqlite.lisp index bded369..216abd8 100644 --- a/src/sources/sqlite/sqlite.lisp +++ b/src/sources/sqlite/sqlite.lisp @@ -24,6 +24,9 @@ (setf (conn-handle slconn) nil) slconn) +(defmethod clone-connection ((slconn sqlite-connection)) + (change-class (call-next-method slconn) 'sqlite-connection)) + (defmethod query ((slconn sqlite-connection) sql &key) (sqlite:execute-to-list (conn-handle slconn) sql)) @@ -155,7 +158,9 @@ "Stream the given SQLite database down to PostgreSQL." (declare (ignore only-tables)) (let* ((cffi:*default-foreign-encoding* encoding) - (copy-kernel (make-kernel 2)) + (copy-kernel (make-kernel 2)) + (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) + (table-count 0) idx-kernel idx-channel) (destructuring-bind (&key all-columns all-indexes pkeys) @@ -193,15 +198,17 @@ do (let ((table-source (make-instance 'copy-sqlite - :source-db (source-db sqlite) - :target-db (target-db sqlite) + :source-db (clone-connection (source-db sqlite)) + :target-db (clone-connection (target-db sqlite)) :source table-name :target (apply-identifier-case table-name) :fields columns))) ;; first COPY the data from SQLite to PostgreSQL, using copy-kernel (unless schema-only + (incf table-count) (copy-from table-source :kernel copy-kernel + :channel copy-channel :disable-triggers disable-triggers)) ;; Create the indexes for that table in parallel with the next @@ -221,12 +228,20 @@ idx-kernel idx-channel)))))) ;; now end the kernels - (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post) + (loop :for tasks :below (* 2 table-count) + :do (destructuring-bind (task . table-name) + (lp:receive-result copy-channel) + (log-message :debug "Finished processing ~a for ~s" + task table-name))) + (lp:end-kernel))) + (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (when (and create-indexes (not data-only)) - (with-stats-collection ("index build completion" :section :post) + (with-stats-collection ("Index Build Completion" :section :post) (loop for idx in all-indexes do (lp:receive-result idx-channel)))) (lp:end-kernel)) diff --git a/src/utils/monitor.lisp b/src/utils/monitor.lisp index aa342cc..5564d3f 100644 --- a/src/utils/monitor.lisp +++ b/src/utils/monitor.lisp @@ -234,10 +234,10 @@ ;;; ;;; Internal utils ;;; -(defun elapsed-time-since (start) +(defun elapsed-time-since (start &optional (end (get-internal-real-time))) "Return how many seconds ticked between START and now" - (let ((now (get-internal-real-time))) - (coerce (/ (- now start) internal-time-units-per-second) 'double-float))) + (let ((end (or end (get-internal-real-time)))) + (coerce (/ (- end start) internal-time-units-per-second) 'double-float))) ;;;