mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-08 15:27:00 +02:00
Implement tables row count ordering for MySQL. (#1120)
This should help optimise the duration of migrating databases with very big tables and lots of smaller ones. It might be a little too naive as far as the optimisation goes, while still being an improvement on the default alphabetical one. Fixes #1099.
This commit is contained in:
parent
14fb15bfbd
commit
11970bbca8
@ -9,6 +9,11 @@
|
||||
(defmethod queue-raw-data ((copy copy) rawq concurrency)
|
||||
"Stream data as read by the map-queue method on the COPY argument into QUEUE,
|
||||
as given."
|
||||
(log-message :notice "COPY ~a ~@[with ~d rows estimated~] [~a/~a]"
|
||||
(format-table-name (target copy))
|
||||
(table-row-count-estimate (target copy))
|
||||
(lp:kernel-worker-index)
|
||||
(lp:kernel-worker-count))
|
||||
(log-message :debug "Reader started for ~a" (format-table-name (target copy)))
|
||||
(let* ((start-time (get-internal-real-time))
|
||||
(row-count 0)
|
||||
@ -93,7 +98,6 @@
|
||||
(trivial-backtrace:print-backtrace condition
|
||||
:output nil))
|
||||
(lp::invoke-transfer-error condition))))
|
||||
(log-message :notice "COPY ~a" table-name)
|
||||
|
||||
;; Check for Read Concurrency Support from our source
|
||||
(when (and multiple-readers (< 1 concurrency))
|
||||
|
@ -255,6 +255,24 @@
|
||||
(setf (catalog-distribution-rules catalog)
|
||||
(citus-distribute-schema catalog distribute))))
|
||||
|
||||
(defun optimize-table-copy-ordering (catalog)
|
||||
"Return a list of tables to copy over in optimized order"
|
||||
(let ((table-list (copy-list (table-list catalog)))
|
||||
(view-list (copy-list (view-list catalog))))
|
||||
;; when materialized views are not supported, view-list is empty here
|
||||
(cond
|
||||
((notevery #'zerop (mapcar #'table-row-count-estimate table-list))
|
||||
(let ((sorted-table-list
|
||||
(sort table-list #'> :key #'table-row-count-estimate)))
|
||||
(log-message :notice
|
||||
"Processing tables in this order: ~{~a: ~d rows~^, ~}"
|
||||
(loop :for table :in (append table-list view-list)
|
||||
:collect (format-table-name table)
|
||||
:collect (table-row-count-estimate table)))
|
||||
(nconc sorted-table-list view-list)))
|
||||
(t
|
||||
(nconc table-list view-list)))))
|
||||
|
||||
|
||||
;;;
|
||||
;;; Generic enough implementation of the copy-database method.
|
||||
@ -414,10 +432,7 @@
|
||||
(return-from copy-database)))
|
||||
|
||||
(loop
|
||||
:for table :in (append (table-list catalog)
|
||||
;; when materialized views are not supported,
|
||||
;; view-list is empty here
|
||||
(view-list catalog))
|
||||
:for table :in (optimize-table-copy-ordering catalog)
|
||||
|
||||
:do (let ((table-source (instanciate-table-copy-object copy table)))
|
||||
;; first COPY the data from source to PostgreSQL, using copy-kernel
|
||||
@ -472,8 +487,21 @@
|
||||
|
||||
(when (and create-indexes
|
||||
(zerop (gethash table writers-count)))
|
||||
(log-message :notice "DONE copying ~a"
|
||||
(format-table-name table))
|
||||
|
||||
(let* ((stats pgloader.monitor::*sections*)
|
||||
(section (get-state-section stats :data))
|
||||
(table-stats (pgstate-get-label section table))
|
||||
(pprint-secs
|
||||
(pgloader.state::format-interval seconds nil)))
|
||||
;; in CCL we have access to the *sections* dynamic
|
||||
;; binding from another thread, in SBCL we access
|
||||
;; an empty copy.
|
||||
(log-message :notice
|
||||
"DONE copying ~a in ~a~@[ for ~d rows~]"
|
||||
(format-table-name table)
|
||||
pprint-secs
|
||||
(when table-stats
|
||||
(pgtable-rows table-stats))))
|
||||
(alexandria:appendf
|
||||
pkeys
|
||||
(create-indexes-in-kernel (target-db copy)
|
||||
|
@ -96,6 +96,7 @@
|
||||
#:table-comment
|
||||
#:table-storage-parameter-list
|
||||
#:table-tablespace
|
||||
#:table-row-count-estimate
|
||||
#:table-field-list
|
||||
#:table-column-list
|
||||
#:table-index-list
|
||||
|
@ -153,6 +153,9 @@
|
||||
(defgeneric fetch-foreign-keys (catalog db-copy &key including excluding)
|
||||
(:documentation "Get the list of foreign keys from the source database."))
|
||||
|
||||
(defgeneric fetch-table-row-count (catalog db-copy &key including excluding)
|
||||
(:documentation "Retrieve and set the row count estimate for given tables."))
|
||||
|
||||
(defgeneric fetch-comments (catalog db-copy &key including excluding)
|
||||
(:documentation "Get the list of comments from the source database."))
|
||||
|
||||
|
@ -155,6 +155,27 @@
|
||||
:finally
|
||||
(return schema)))
|
||||
|
||||
;;;
|
||||
;;; MySQL table row count estimate
|
||||
;;;
|
||||
(defmethod fetch-table-row-count ((schema schema)
|
||||
(mysql copy-mysql)
|
||||
&key
|
||||
including
|
||||
excluding)
|
||||
"Retrieve and set the row count estimate for given MySQL tables."
|
||||
(loop
|
||||
:for (table-name count)
|
||||
:in (mysql-query (sql "/mysql/list-table-rows.sql"
|
||||
(db-name *connection*)
|
||||
including ; do we print the clause?
|
||||
including
|
||||
excluding ; do we print the clause?
|
||||
excluding))
|
||||
:do (let* ((table (find-table schema table-name)))
|
||||
(when table
|
||||
(setf (table-row-count-estimate table) (parse-integer count))))))
|
||||
|
||||
|
||||
;;;
|
||||
;;; Queries to get the MySQL comments.
|
||||
|
@ -165,6 +165,11 @@ Illegal ~a character starting at position ~a~@[: ~a~].~%"
|
||||
:including including
|
||||
:excluding excluding)
|
||||
|
||||
;; fetch tables row count estimate
|
||||
(fetch-table-row-count schema mysql
|
||||
:including including
|
||||
:excluding excluding)
|
||||
|
||||
;; fetch view (and their columns) metadata, covering comments too
|
||||
(let* ((view-names (unless (eq :all materialize-views)
|
||||
(mapcar #'matview-source-name materialize-views)))
|
||||
|
12
src/sources/mysql/sql/list-table-rows.sql
Normal file
12
src/sources/mysql/sql/list-table-rows.sql
Normal file
@ -0,0 +1,12 @@
|
||||
-- params: db-name
|
||||
-- including
|
||||
-- filter-list-to-where-clause incuding
|
||||
-- excluding
|
||||
-- filter-list-to-where-clause excluding
|
||||
SELECT table_name,
|
||||
cast(data_length/avg_row_length as integer)
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = '~a'
|
||||
and table_type = 'BASE TABLE'
|
||||
~:[~*~;and (~{table_name ~a~^ or ~})~]
|
||||
~:[~*~;and (~{table_name ~a~^ and ~})~];
|
@ -49,10 +49,16 @@
|
||||
|
||||
(defstruct table source-name name schema oid comment
|
||||
storage-parameter-list tablespace
|
||||
(row-count-estimate 0 :type fixnum)
|
||||
;; field is for SOURCE
|
||||
field-list
|
||||
;; column is for TARGET
|
||||
column-list
|
||||
index-list
|
||||
fkey-list
|
||||
trigger-list
|
||||
;; citus is an extra slot for citus support
|
||||
field-list column-list index-list fkey-list trigger-list citus-rule)
|
||||
citus-rule)
|
||||
|
||||
(defstruct matview source-name name schema definition)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user