mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-11 00:36:59 +02:00
Implement support for migrating MS SQL Indexes.
This commit is contained in:
parent
5ab0831d4e
commit
a8398c20ba
@ -106,6 +106,30 @@
|
|||||||
;; return the copy-mssql object we just did the COPY for
|
;; return the copy-mssql object we just did the COPY for
|
||||||
mssql))
|
mssql))
|
||||||
|
|
||||||
|
(defun fetch-mssql-metadata (&key state only-tables)
|
||||||
|
"MS SQL introspection to prepare the migration."
|
||||||
|
(let (all-columns all-indexes)
|
||||||
|
(with-stats-collection ("fetch meta data"
|
||||||
|
:use-result-as-rows t
|
||||||
|
:use-result-as-read t
|
||||||
|
:state state)
|
||||||
|
(with-mssql-connection ()
|
||||||
|
(setf all-columns (filter-column-list (list-all-columns)
|
||||||
|
:only-tables only-tables))
|
||||||
|
|
||||||
|
(setf all-indexes (filter-column-list (list-all-indexes)
|
||||||
|
:only-tables only-tables))
|
||||||
|
|
||||||
|
;; return how many objects we're going to deal with in total
|
||||||
|
;; for stats collection
|
||||||
|
(+ (loop :for (schema . tables) :in all-columns :sum (length tables))
|
||||||
|
(loop :for (schema . tables) :in all-indexes
|
||||||
|
:sum (loop :for (table . indexes) :in tables
|
||||||
|
:sum (length indexes))))))
|
||||||
|
|
||||||
|
;; now return a plist to the caller
|
||||||
|
(list :all-columns all-columns :all-indexes all-indexes)))
|
||||||
|
|
||||||
(defmethod copy-database ((mssql copy-mssql)
|
(defmethod copy-database ((mssql copy-mssql)
|
||||||
&key
|
&key
|
||||||
state-before
|
state-before
|
||||||
@ -122,7 +146,7 @@
|
|||||||
(encoding :utf-8)
|
(encoding :utf-8)
|
||||||
only-tables)
|
only-tables)
|
||||||
"Stream the given MS SQL database down to PostgreSQL."
|
"Stream the given MS SQL database down to PostgreSQL."
|
||||||
(declare (ignore create-indexes reset-sequences foreign-keys))
|
(declare (ignore reset-sequences foreign-keys))
|
||||||
(let* ((summary (null *state*))
|
(let* ((summary (null *state*))
|
||||||
(*state* (or *state* (make-pgstate)))
|
(*state* (or *state* (make-pgstate)))
|
||||||
(idx-state (or state-indexes (make-pgstate)))
|
(idx-state (or state-indexes (make-pgstate)))
|
||||||
@ -130,74 +154,112 @@
|
|||||||
(state-after (or state-after (make-pgstate)))
|
(state-after (or state-after (make-pgstate)))
|
||||||
(cffi:*default-foreign-encoding* encoding)
|
(cffi:*default-foreign-encoding* encoding)
|
||||||
(copy-kernel (make-kernel 2))
|
(copy-kernel (make-kernel 2))
|
||||||
(all-columns (filter-column-list
|
idx-kernel idx-channel)
|
||||||
(with-mssql-connection ()
|
|
||||||
(list-all-columns))
|
|
||||||
:only-tables only-tables))
|
|
||||||
;; (all-indexes (filter-column-list (list-all-indexes)
|
|
||||||
;; :only-tables only-tables
|
|
||||||
;; :including including
|
|
||||||
;; :excluding excluding))
|
|
||||||
;; (max-indexes (loop :for (table . indexes) :in all-indexes
|
|
||||||
;; :maximizing (length indexes)))
|
|
||||||
;; (idx-kernel (when (and max-indexes (< 0 max-indexes))
|
|
||||||
;; (make-kernel max-indexes)))
|
|
||||||
;; (idx-channel (when idx-kernel
|
|
||||||
;; (let ((lp:*kernel* idx-kernel))
|
|
||||||
;; (lp:make-channel))))
|
|
||||||
)
|
|
||||||
|
|
||||||
;; if asked, first drop/create the tables on the PostgreSQL side
|
(destructuring-bind (&key all-columns all-indexes)
|
||||||
(cond ((and (or create-tables schema-only) (not data-only))
|
;; to prepare the run we need to fetch MS SQL meta-data
|
||||||
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
|
(fetch-mssql-metadata :state state-before
|
||||||
(with-stats-collection ("create, truncate"
|
:only-tables only-tables)
|
||||||
:state state-before
|
|
||||||
:summary summary)
|
|
||||||
(with-pgsql-transaction ()
|
|
||||||
(loop :for (schema . tables) :in all-columns
|
|
||||||
:do (let ((schema (apply-identifier-case schema)))
|
|
||||||
;; create schema
|
|
||||||
(let ((sql (format nil "CREATE SCHEMA ~a;" schema)))
|
|
||||||
(log-message :notice "~a" sql)
|
|
||||||
(pgsql-execute sql))
|
|
||||||
|
|
||||||
;; set search_path to only that schema
|
(let ((max-indexes (loop :for (schema . tables) :in all-indexes
|
||||||
(pgsql-execute
|
:maximizing (loop :for (table . indexes) :in tables
|
||||||
(format nil "SET LOCAL search_path TO ~a;" schema))
|
:maximizing (length indexes)))))
|
||||||
|
|
||||||
;; and now create the tables within that schema
|
(setf idx-kernel (when (and max-indexes (< 0 max-indexes))
|
||||||
(create-tables tables :include-drop include-drop))))))
|
(make-kernel max-indexes)))
|
||||||
|
|
||||||
(truncate
|
(setf idx-channel (when idx-kernel
|
||||||
(let ((qualified-table-name-list
|
(let ((lp:*kernel* idx-kernel))
|
||||||
(qualified-table-name-list all-columns)))
|
(lp:make-channel)))))
|
||||||
(truncate-tables (target-db mssql)
|
|
||||||
;; here we really do want only the name
|
|
||||||
(mapcar #'car qualified-table-name-list)))))
|
|
||||||
|
|
||||||
;; Transfert the data
|
;; if asked, first drop/create the tables on the PostgreSQL side
|
||||||
(loop :for (schema . tables) :in all-columns
|
(handler-case
|
||||||
:do (loop :for (table-name . columns) :in tables
|
(cond ((and (or create-tables schema-only) (not data-only))
|
||||||
:do
|
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
|
||||||
(let ((table-source
|
(with-stats-collection ("create, truncate"
|
||||||
(make-instance 'copy-mssql
|
:state state-before
|
||||||
:source-db (source-db mssql)
|
:summary summary)
|
||||||
:target-db (target-db mssql)
|
(with-pgsql-transaction ()
|
||||||
:source (cons schema table-name)
|
(loop :for (schema . tables) :in all-columns
|
||||||
:target (qualify-name schema table-name)
|
:do (let ((schema (apply-identifier-case schema)))
|
||||||
:fields columns)))
|
;; create schema
|
||||||
(log-message :debug "TARGET: ~a" (target table-source))
|
(let ((sql (format nil "CREATE SCHEMA ~a;" schema)))
|
||||||
(log-message :log "target: ~s" table-source)
|
(log-message :notice "~a" sql)
|
||||||
;; COPY the data to PostgreSQL, using copy-kernel
|
(pgsql-execute sql))
|
||||||
(unless schema-only
|
|
||||||
(copy-from table-source :kernel copy-kernel)))))
|
;; set search_path to only that schema
|
||||||
|
(pgsql-execute
|
||||||
|
(format nil "SET LOCAL search_path TO ~a;" schema))
|
||||||
|
|
||||||
|
;; and now create the tables within that schema
|
||||||
|
(create-tables tables :include-drop include-drop))))))
|
||||||
|
|
||||||
|
(truncate
|
||||||
|
(let ((qualified-table-name-list
|
||||||
|
(qualified-table-name-list all-columns)))
|
||||||
|
(truncate-tables (target-db mssql)
|
||||||
|
;; here we really do want only the name
|
||||||
|
(mapcar #'car qualified-table-name-list)))))
|
||||||
|
|
||||||
|
;;
|
||||||
|
;; In case some error happens in the preparatory transaction, we
|
||||||
|
;; need to stop now and refrain from trying to load the data into an
|
||||||
|
;; incomplete schema.
|
||||||
|
;;
|
||||||
|
(cl-postgres:database-error (e)
|
||||||
|
(declare (ignore e)) ; a log has already been printed
|
||||||
|
(log-message :fatal "Failed to create the schema, see above.")
|
||||||
|
|
||||||
|
(return-from copy-database)))
|
||||||
|
|
||||||
|
;; Transfert the data
|
||||||
|
(loop :for (schema . tables) :in all-columns
|
||||||
|
:do (loop :for (table-name . columns) :in tables
|
||||||
|
:do
|
||||||
|
(let ((table-source
|
||||||
|
(make-instance 'copy-mssql
|
||||||
|
:source-db (source-db mssql)
|
||||||
|
:target-db (target-db mssql)
|
||||||
|
:source (cons schema table-name)
|
||||||
|
:target (qualify-name schema table-name)
|
||||||
|
:fields columns)))
|
||||||
|
(log-message :debug "TARGET: ~a" (target table-source))
|
||||||
|
(log-message :log "target: ~s" table-source)
|
||||||
|
;; COPY the data to PostgreSQL, using copy-kernel
|
||||||
|
(unless schema-only
|
||||||
|
(copy-from table-source :kernel copy-kernel))
|
||||||
|
|
||||||
|
;; Create the indexes for that table in parallel with the next
|
||||||
|
;; COPY, and all at once in concurrent threads to benefit from
|
||||||
|
;; PostgreSQL synchronous scan ability
|
||||||
|
;;
|
||||||
|
;; We just push new index build as they come along, if one
|
||||||
|
;; index build requires much more time than the others our
|
||||||
|
;; index build might get unsync: indexes for different tables
|
||||||
|
;; will get built in parallel --- not a big problem.
|
||||||
|
(when (and create-indexes (not data-only))
|
||||||
|
(let* ((s-entry (assoc schema all-indexes :test 'equal))
|
||||||
|
(indexes-with-names
|
||||||
|
(cdr (assoc table-name (cdr s-entry) :test 'equal))))
|
||||||
|
(create-indexes-in-kernel (target-db mssql)
|
||||||
|
(mapcar #'cdr indexes-with-names)
|
||||||
|
idx-kernel
|
||||||
|
idx-channel
|
||||||
|
:state idx-state))))))
|
||||||
|
|
||||||
;; now end the kernels
|
;; now end the kernels
|
||||||
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel))
|
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel))
|
||||||
|
(let ((lp:*kernel* idx-kernel))
|
||||||
|
;; wait until the indexes are done being built...
|
||||||
|
;; don't forget accounting for that waiting time.
|
||||||
|
(when (and create-indexes (not data-only))
|
||||||
|
(with-stats-collection ("Index Build Completion" :state *state*)
|
||||||
|
(loop for idx in all-indexes do (lp:receive-result idx-channel))))
|
||||||
|
(lp:end-kernel))
|
||||||
|
|
||||||
;; and report the total time spent on the operation
|
;; and report the total time spent on the operation
|
||||||
(when summary
|
(when summary
|
||||||
(report-full-summary "Total streaming time" *state*
|
(report-full-summary "Total streaming time" *state*
|
||||||
:before state-before
|
:before state-before
|
||||||
:finally state-after
|
:finally state-after
|
||||||
:parallel idx-state))))
|
:parallel idx-state)))))
|
||||||
|
@ -133,18 +133,15 @@ order by table_schema, table_name, ordinal_position"
|
|||||||
(reverse (loop :for (table-name . cols) :in tables
|
(reverse (loop :for (table-name . cols) :in tables
|
||||||
:collect (cons table-name (reverse cols))))))))))
|
:collect (cons table-name (reverse cols))))))))))
|
||||||
|
|
||||||
(defun list-all-indexes (&key
|
(defun list-all-indexes ()
|
||||||
(dbname *my-dbname*))
|
|
||||||
"Get the list of MSSQL index definitions per table."
|
"Get the list of MSSQL index definitions per table."
|
||||||
(loop
|
(loop
|
||||||
:with result := nil
|
:with result := nil
|
||||||
:for (schema table-name name col unique pkey)
|
:for (schema table name col unique pkey)
|
||||||
:in (mssql-query (format nil "
|
:in (mssql-query (format nil "
|
||||||
select schema_name(schema_id) as SchemaName
|
select schema_name(schema_id) as SchemaName,
|
||||||
o.name as TableName,
|
o.name as TableName,
|
||||||
i.name as IndexName,
|
i.name as IndexName,
|
||||||
ic.key_ordinal as ColumnOrder,
|
|
||||||
ic.is_included_column as IsIncluded,
|
|
||||||
co.[name] as ColumnName,
|
co.[name] as ColumnName,
|
||||||
i.is_unique,
|
i.is_unique,
|
||||||
i.is_primary_key
|
i.is_primary_key
|
||||||
@ -156,30 +153,53 @@ order by table_schema, table_name, ordinal_position"
|
|||||||
join sys.columns co on co.object_id = i.object_id
|
join sys.columns co on co.object_id = i.object_id
|
||||||
and co.column_id = ic.column_id
|
and co.column_id = ic.column_id
|
||||||
|
|
||||||
|
where schema_name(schema_id) not in ('dto', 'sys')
|
||||||
|
|
||||||
order by SchemaName,
|
order by SchemaName,
|
||||||
o.[name],
|
o.[name],
|
||||||
i.[name],
|
i.[name],
|
||||||
ic.is_included_column,
|
ic.is_included_column,
|
||||||
ic.key_ordinal"))
|
ic.key_ordinal"))
|
||||||
:do
|
:do
|
||||||
(let* ((s-entry (assoc schema result :test 'equal))
|
(let* ((s-entry (assoc schema result :test 'equal))
|
||||||
(t-entry (when s-entry
|
(t-entry (when s-entry
|
||||||
(assoc table-name (cdr t-entry) :test 'equal)))
|
(assoc table (cdr s-entry) :test 'equal)))
|
||||||
(index (when t-entry
|
(i-entry (when t-entry
|
||||||
(assoc name (cdr t-entry) :test 'equal))))
|
(assoc name (cdr t-entry) :test 'equal)))
|
||||||
|
(index (make-pgsql-index :name name
|
||||||
|
:primary (= pkey 1)
|
||||||
|
:table-name (qualify-name schema table)
|
||||||
|
:unique (= unique 1)
|
||||||
|
:columns (list col))))
|
||||||
(if s-entry
|
(if s-entry
|
||||||
(if t-entry
|
(if t-entry
|
||||||
(push column (cdr t-entry))
|
(if i-entry
|
||||||
(push (cons table-name (list column)) (cdr s-entry)))
|
(push col
|
||||||
(push (cons schema (list (cons table-name (list column)))) result)))
|
(pgloader.pgsql::pgsql-index-columns (cdr i-entry)))
|
||||||
|
(push (cons name index) (cdr t-entry)))
|
||||||
|
(push (cons table (list (cons name index))) (cdr s-entry)))
|
||||||
|
(push (cons schema
|
||||||
|
(list (cons table
|
||||||
|
(list (cons name index))))) result)))
|
||||||
:finally
|
:finally
|
||||||
;; we did push, we need to reverse here
|
;; we did push, we need to reverse here
|
||||||
(return (reverse
|
(return
|
||||||
(loop :for (schema . tables) :in result
|
(labels ((reverse-index-cols (index)
|
||||||
:collect
|
(setf (pgloader.pgsql::pgsql-index-columns index)
|
||||||
(cons schema
|
(nreverse (pgloader.pgsql::pgsql-index-columns index)))
|
||||||
(reverse (loop :for (table-name . cols) :in tables
|
index)
|
||||||
:collect (cons table-name (reverse cols))))))))))
|
|
||||||
|
(reverse-indexes-cols (list-of-indexes)
|
||||||
|
(loop :for (name . index) :in list-of-indexes
|
||||||
|
:collect (cons name (reverse-index-cols index))))
|
||||||
|
|
||||||
|
(reverse-indexes-cols (list-of-tables)
|
||||||
|
(reverse
|
||||||
|
(loop :for (table . indexes) :in list-of-tables
|
||||||
|
:collect (cons table (reverse-indexes-cols indexes))))))
|
||||||
|
(reverse
|
||||||
|
(loop :for (schema . tables) :in result
|
||||||
|
:collect (cons schema (reverse-indexes-cols tables))))))))
|
||||||
|
|
||||||
|
|
||||||
;;;
|
;;;
|
||||||
|
Loading…
Reference in New Issue
Block a user