diff --git a/pgloader.asd b/pgloader.asd index edc35c1..0ab7010 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -55,8 +55,21 @@ (:file "logs") (:file "utils") (:file "state") + + ;; user defined transforms package and pgloader + ;; provided ones + (:file "transforms") + + ;; PostgreSQL related utils + (:file "read-sql-files") + (:file "quoting") + (:file "schema-structs" :depends-on ("quoting")) + + ;; State, monitoring, reporting (:file "reject" :depends-on ("state")) - (:file "report" :depends-on ("state" "utils")) + (:file "report" :depends-on ("state" + "utils" + "schema-structs")) (:file "monitor" :depends-on ("logs" "state" "reject" @@ -64,13 +77,7 @@ (:file "archive" :depends-on ("logs")) ;; generic connection api - (:file "connection" :depends-on ("archive")) - - ;; those are one-package-per-file - (:file "transforms") - (:file "read-sql-files") - (:file "quoting") - (:file "schema-structs" :depends-on ("quoting")))) + (:file "connection" :depends-on ("archive")))) ;; package pgloader.pgsql (:module pgsql diff --git a/src/package.lisp b/src/package.lisp index 5af1e14..5620500 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -23,8 +23,81 @@ #:start-logger #:stop-logger)) -(defpackage #:pgloader.state +(defpackage #:pgloader.schema (:use #:cl #:pgloader.params) + (:export #:catalog + #:schema + #:table + #:column + #:index + #:index + #:fkey + + #:cast ; generic function for sources + + #:make-catalog + #:make-schema + #:make-table + #:create-table + #:make-view + #:make-column + #:make-index + #:make-index + #:make-fkey + + #:catalog-name + #:catalog-schema-list + #:schema-name + #:schema-table-list + #:schema-view-list + #:table-name + #:table-source-name + #:table-schema + #:table-oid + #:table-comment + #:table-field-list + #:table-column-list + #:table-index-list + #:table-fkey-list + #:column-name + #:column-type-name + #:column-type-mod + #:column-type-nullable + #:column-default + #:column-comment + #:column-transform + + #:table-list + #:view-list + #:add-schema + #:find-schema + #:maybe-add-schema + #:add-table + #:find-table + #:maybe-add-table + #:add-view + #:find-view + #:maybe-add-view + #:add-field + #:add-column + #:add-index + #:add-fkey + #:find-fkey + #:maybe-add-fkey + #:count-tables + #:count-views + #:count-indexes + #:count-fkeys + #:max-indexes-per-table + + #:push-to-end + #:with-schema + + #:format-default-value + #:format-column)) + +(defpackage #:pgloader.state + (:use #:cl #:pgloader.params #:pgloader.schema) (:export #:format-table-name #:make-pgstate #:pgstate-tabnames @@ -70,7 +143,9 @@ #:timing)) (defpackage #:pgloader.utils - (:use #:cl #:pgloader.params #:pgloader.monitor #:pgloader.state) + (:use #:cl + #:pgloader.params #:pgloader.schema + #:pgloader.monitor #:pgloader.state) (:import-from #:alexandria #:appendf #:read-file-into-string) @@ -113,7 +188,7 @@ ;; quoting #:apply-identifier-case - ;; Schema structure bits + ;; schema #:catalog #:schema #:table diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index 82f63c8..f72d57b 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -109,7 +109,7 @@ :source-db (clone-connection (source-db copy)) :target-db (clone-connection (target-db copy)) :source table - :target (table-name table) + :target table :fields fields :columns columns :transforms transforms))) @@ -155,7 +155,7 @@ :including including :excluding excluding)) pkeys - (table-count 0) + (writers-count (make-hash-table :size (count-tables catalog))) (max-indexes (when create-indexes (max-indexes-per-table catalog))) (idx-kernel (when (and max-indexes (< 0 max-indexes)) @@ -208,33 +208,70 @@ ;; first COPY the data from source to PostgreSQL, using copy-kernel (unless schema-only - (incf table-count) + ;; prepare the writers-count hash-table, as we start + ;; copy-from, we have concurrency tasks writing. + (setf (gethash table writers-count) concurrency) (copy-from table-source :concurrency concurrency :kernel copy-kernel :channel copy-channel - :disable-triggers disable-triggers)) - - ;; Create the indexes for that table in parallel with the next - ;; COPY, and all at once in concurrent threads to benefit from - ;; PostgreSQL synchronous scan ability - ;; - ;; We just push new index build as they come along, if one - ;; index build requires much more time than the others our - ;; index build might get unsync: indexes for different tables - ;; will get built in parallel --- not a big problem. - (when (and create-indexes (not data-only)) - (let* ((*preserve-index-names* (eq :preserve index-names))) - (alexandria:appendf - pkeys - (create-indexes-in-kernel (target-db copy) - table - idx-kernel idx-channel)))))) + :disable-triggers disable-triggers)))) ;; now end the kernels - (end-kernels copy-kernel copy-channel idx-kernel idx-channel - table-count (count-indexes catalog) - :concurrency concurrency) + ;; and each time a table is done, launch its indexing + (unless schema-only + (let ((lp:*kernel* copy-kernel)) + (with-stats-collection ("COPY Threads Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (let ((worker-count (* (hash-table-count writers-count) + (task-count concurrency)))) + (loop :for tasks :below worker-count + :do (destructuring-bind (task table seconds) + (lp:receive-result copy-channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + (update-stats :data table :secs seconds) + + ;; + ;; Start the CREATE INDEX parallel tasks only when + ;; the data has been fully copied over to the + ;; corresponding table, that's when the writers + ;; count is down to zero. + ;; + (decf (gethash table writers-count)) + (log-message :debug "writers-counts[~a] = ~a" + (format-table-name table) + (gethash table writers-count)) + + (when (and create-indexes + (not data-only) + (zerop (gethash table writers-count))) + (let* ((*preserve-index-names* + (eq :preserve index-names))) + (alexandria:appendf + pkeys + (create-indexes-in-kernel (target-db copy) + table + idx-kernel + idx-channel))))))) + (prog1 + worker-count + (lp:end-kernel :wait nil)))))) + + (when create-indexes + (let ((lp:*kernel* idx-kernel)) + ;; wait until the indexes are done being built... + ;; don't forget accounting for that waiting time. + (with-stats-collection ("Index Build Completion" :section :post + :use-result-as-read t + :use-result-as-rows t) + (loop :for count :below (count-indexes catalog) + :do (lp:receive-result idx-channel)) + (lp:end-kernel :wait t) + (count-indexes catalog)))) ;; ;; Complete the PostgreSQL database before handing over. @@ -250,39 +287,3 @@ ;; Time to cleanup! ;; (cleanup copy catalog :materialize-views materialize-views))) - - -;;; -;;; Lower level tools -;;; -(defun end-kernels (copy-kernel copy-channel idx-kernel idx-channel - table-count index-count - &key (concurrency 2)) - "Terminate the lparallel kernels, waiting for all threads." - (when copy-kernel - (let ((lp:*kernel* copy-kernel)) - (with-stats-collection ("COPY Threads Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (let ((worker-count (* table-count (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (destructuring-bind (task table-name seconds) - (lp:receive-result copy-channel) - (log-message :debug "Finished processing ~a for ~s ~50T~6$s" - task table-name seconds) - (when (eq :writer task) - (update-stats :data table-name :secs seconds)))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))))) - - (when idx-kernel - (let ((lp:*kernel* idx-kernel)) - ;; wait until the indexes are done being built... - ;; don't forget accounting for that waiting time. - (with-stats-collection ("Index Build Completion" :section :post - :use-result-as-read t - :use-result-as-rows t) - (loop :for count :below index-count - :count (lp:receive-result idx-channel))) - (lp:end-kernel)))) diff --git a/src/utils/report.lisp b/src/utils/report.lisp index 570c641..0997ee2 100644 --- a/src/utils/report.lisp +++ b/src/utils/report.lisp @@ -143,23 +143,24 @@ "Report a whole summary." (when header (report-header)) (loop - for table-name in (reverse (pgstate-tabnames pgstate)) - for pgtable = (gethash table-name (pgstate-tables pgstate)) - do - (with-slots (read rows errs secs rs ws) pgtable - (format *report-stream* - *header-tname-format* - *max-length-table-name* - (format-table-name table-name)) - (report-results read rows errs - (cond ((> 0 secs) (format-interval secs nil)) - ((and rs ws (= 0 secs)) - (format-interval (max rs ws) nil)) - (t (format-interval secs nil))) - (when (and rs (not (= rs 0.0))) (format-interval rs nil)) - (when (and ws (not (= ws 0.0))) (format-interval ws nil)))) - finally (when footer - (report-pgstate-stats pgstate footer)))) + :for label :in (reverse (pgstate-tabnames pgstate)) + :for pgtable := (gethash label (pgstate-tables pgstate)) + :do (with-slots (read rows errs secs rs ws) pgtable + (format *report-stream* + *header-tname-format* + *max-length-table-name* + (etypecase label + (string label) + (table (format-table-name label)))) + (report-results read rows errs + (cond ((> 0 secs) (format-interval secs nil)) + ((and rs ws (= 0 secs)) + (format-interval (max rs ws) nil)) + (t (format-interval secs nil))) + (when (and rs (not (= rs 0.0))) (format-interval rs nil)) + (when (and ws (not (= ws 0.0))) (format-interval ws nil)))) + :finally (when footer + (report-pgstate-stats pgstate footer)))) (defun parse-summary-type (&optional (pathname *summary-pathname*)) "Return the summary type we want: human-readable, csv, json." @@ -173,7 +174,10 @@ "Compute the max length of a table-name in the legend." (reduce #'max (mapcar #'length - (mapcar #'format-table-name + (mapcar (lambda (entry) + (etypecase entry + (string entry) + (table (format-table-name entry)))) (append (pgstate-tabnames data) (pgstate-tabnames pre) (pgstate-tabnames post) diff --git a/src/utils/schema-structs.lisp b/src/utils/schema-structs.lisp index b0ab147..a960f4e 100644 --- a/src/utils/schema-structs.lisp +++ b/src/utils/schema-structs.lisp @@ -8,7 +8,7 @@ ;;; Utility function using those definitions are found in schema.lisp in the ;;; same directory. ;;; -(in-package :pgloader.utils) +(in-package :pgloader.schema) (defmacro push-to-end (item place) `(setf ,place (nconc ,place (list ,item)))) @@ -314,14 +314,14 @@ ;;; Not a generic/method because only used for the table object, and we want ;;; to use the usual structure print-method in stack traces. ;;; -(defun format-table-name (table) +(defgeneric format-table-name (object) + (:documentation "Format the OBJECT name for PostgreSQL.")) + +(defmethod format-table-name ((table table)) "TABLE should be a table instance, but for hysterical raisins might be a CONS of a schema name and a table name, or just the table name as a string." - (etypecase table - (table (format nil "~@[~a.~]~a" (table-schema table) (table-name table))) - (cons (format nil "~a.~a" (car table) (cdr table))) - (string table))) + (format nil "~@[~a.~]~a" (table-schema table) (table-name table))) ;;;