diff --git a/src/sources/common/db-methods.lisp b/src/sources/common/db-methods.lisp index ab6fc3a..189c2c5 100644 --- a/src/sources/common/db-methods.lisp +++ b/src/sources/common/db-methods.lisp @@ -200,6 +200,29 @@ (when alter-table (alter-table catalog alter-table))) +(defun create-indexes (copy table kernel channel + &key + data-only + create-tables + create-indexes + (index-names :uniquify)) + "Launch the CREATE INDEX in parralel for given table. Returns a list of + the UNIQUE indexes that are created here and need to then be upgraded as + PRIMARY KEYS." + (when (and create-indexes (not data-only)) + (let* ((*preserve-index-names* + (or (eq :preserve index-names) + ;; if we didn't create the tables, we + ;; are re-installing the pre-existing + ;; indexes + (not create-tables)))) + ;; that returns Primary Keys to be upgraded from the UNIQUE indexes + ;; that we just prepared in this step. + (create-indexes-in-kernel (target-db copy) + table + kernel + channel)))) + ;;; ;;; Generic enough implementation of the copy-database method. @@ -300,16 +323,26 @@ :do (let ((table-source (instanciate-table-copy-object copy table))) ;; first COPY the data from source to PostgreSQL, using copy-kernel - (unless schema-only - ;; prepare the writers-count hash-table, as we start - ;; copy-from, we have concurrency tasks writing. - (setf (gethash table writers-count) concurrency) - (copy-from table-source - :concurrency concurrency - :kernel copy-kernel - :channel copy-channel - :on-error-stop on-error-stop - :disable-triggers disable-triggers)))) + (if schema-only + ;; start indexing straight away then + (alexandria:appendf + pkeys + (create-indexes copy table idx-kernel idx-channel + :data-only data-only + :create-tables create-tables + :create-indexes create-indexes + :index-names index-names)) + + ;; prepare the writers-count hash-table, as we start + ;; copy-from, we have concurrency tasks writing. + (progn + (setf (gethash table writers-count) concurrency) + (copy-from table-source + :concurrency concurrency + :kernel copy-kernel + :channel copy-channel + :on-error-stop on-error-stop + :disable-triggers disable-triggers))))) ;; now end the kernels ;; and each time a table is done, launch its indexing @@ -318,46 +351,39 @@ (with-stats-collection ("COPY Threads Completion" :section :post :use-result-as-read t :use-result-as-rows t) - (let ((worker-count (* (hash-table-count writers-count) - (task-count concurrency)))) - (loop :for tasks :below worker-count - :do (destructuring-bind (task table seconds) - (lp:receive-result copy-channel) - (log-message :debug - "Finished processing ~a for ~s ~50T~6$s" - task (format-table-name table) seconds) - (when (eq :writer task) - (update-stats :data table :secs seconds) + (let ((worker-count (* (hash-table-count writers-count) + (task-count concurrency)))) + (loop :for tasks :below worker-count + :do (destructuring-bind (task table seconds) + (lp:receive-result copy-channel) + (log-message :debug + "Finished processing ~a for ~s ~50T~6$s" + task (format-table-name table) seconds) + (when (eq :writer task) + (update-stats :data table :secs seconds) - ;; - ;; Start the CREATE INDEX parallel tasks only when - ;; the data has been fully copied over to the - ;; corresponding table, that's when the writers - ;; count is down to zero. - ;; - (decf (gethash table writers-count)) - (log-message :debug "writers-counts[~a] = ~a" - (format-table-name table) - (gethash table writers-count)) + ;; + ;; Start the CREATE INDEX parallel tasks only when + ;; the data has been fully copied over to the + ;; corresponding table, that's when the writers + ;; count is down to zero. + ;; + (decf (gethash table writers-count)) + (log-message :debug "writers-counts[~a] = ~a" + (format-table-name table) + (gethash table writers-count)) - (when (and create-indexes - (not data-only) - (zerop (gethash table writers-count))) - (let* ((*preserve-index-names* - (or (eq :preserve index-names) - ;; if we didn't create the tables, we - ;; are re-installing the pre-existing - ;; indexes - (not create-tables)))) - (alexandria:appendf - pkeys - (create-indexes-in-kernel (target-db copy) - table - idx-kernel - idx-channel))))))) - (prog1 - worker-count - (lp:end-kernel :wait nil)))))) + (when (zerop (gethash table writers-count)) + (alexandria:appendf + pkeys + (create-indexes copy table idx-kernel idx-channel + :data-only data-only + :create-tables create-tables + :create-indexes create-indexes + :index-names index-names)))))) + (prog1 + worker-count + (lp:end-kernel :wait nil)))))) (when create-indexes (let ((lp:*kernel* idx-kernel)) @@ -366,8 +392,8 @@ (with-stats-collection ("Index Build Completion" :section :post :use-result-as-read t :use-result-as-rows t) - (loop :for count :below (count-indexes catalog) - :do (lp:receive-result idx-channel)) + (loop :for count :below (count-indexes catalog) + :do (lp:receive-result idx-channel)) (lp:end-kernel :wait t) (count-indexes catalog))))