diff --git a/src/sources/mssql.lisp b/src/sources/mssql.lisp index 6a6e06b..91955de 100644 --- a/src/sources/mssql.lisp +++ b/src/sources/mssql.lisp @@ -106,6 +106,30 @@ ;; return the copy-mssql object we just did the COPY for mssql)) +(defun fetch-mssql-metadata (&key state only-tables) + "MS SQL introspection to prepare the migration." + (let (all-columns all-indexes) + (with-stats-collection ("fetch meta data" + :use-result-as-rows t + :use-result-as-read t + :state state) + (with-mssql-connection () + (setf all-columns (filter-column-list (list-all-columns) + :only-tables only-tables)) + + (setf all-indexes (filter-column-list (list-all-indexes) + :only-tables only-tables)) + + ;; return how many objects we're going to deal with in total + ;; for stats collection + (+ (loop :for (schema . tables) :in all-columns :sum (length tables)) + (loop :for (schema . tables) :in all-indexes + :sum (loop :for (table . indexes) :in tables + :sum (length indexes)))))) + + ;; now return a plist to the caller + (list :all-columns all-columns :all-indexes all-indexes))) + (defmethod copy-database ((mssql copy-mssql) &key state-before @@ -122,7 +146,7 @@ (encoding :utf-8) only-tables) "Stream the given MS SQL database down to PostgreSQL." - (declare (ignore create-indexes reset-sequences foreign-keys)) + (declare (ignore reset-sequences foreign-keys)) (let* ((summary (null *state*)) (*state* (or *state* (make-pgstate))) (idx-state (or state-indexes (make-pgstate))) @@ -130,74 +154,112 @@ (state-after (or state-after (make-pgstate))) (cffi:*default-foreign-encoding* encoding) (copy-kernel (make-kernel 2)) - (all-columns (filter-column-list - (with-mssql-connection () - (list-all-columns)) - :only-tables only-tables)) - ;; (all-indexes (filter-column-list (list-all-indexes) - ;; :only-tables only-tables - ;; :including including - ;; :excluding excluding)) - ;; (max-indexes (loop :for (table . indexes) :in all-indexes - ;; :maximizing (length indexes))) - ;; (idx-kernel (when (and max-indexes (< 0 max-indexes)) - ;; (make-kernel max-indexes))) - ;; (idx-channel (when idx-kernel - ;; (let ((lp:*kernel* idx-kernel)) - ;; (lp:make-channel)))) - ) + idx-kernel idx-channel) - ;; if asked, first drop/create the tables on the PostgreSQL side - (cond ((and (or create-tables schema-only) (not data-only)) - (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) - (with-stats-collection ("create, truncate" - :state state-before - :summary summary) - (with-pgsql-transaction () - (loop :for (schema . tables) :in all-columns - :do (let ((schema (apply-identifier-case schema))) - ;; create schema - (let ((sql (format nil "CREATE SCHEMA ~a;" schema))) - (log-message :notice "~a" sql) - (pgsql-execute sql)) + (destructuring-bind (&key all-columns all-indexes) + ;; to prepare the run we need to fetch MS SQL meta-data + (fetch-mssql-metadata :state state-before + :only-tables only-tables) - ;; set search_path to only that schema - (pgsql-execute - (format nil "SET LOCAL search_path TO ~a;" schema)) + (let ((max-indexes (loop :for (schema . tables) :in all-indexes + :maximizing (loop :for (table . indexes) :in tables + :maximizing (length indexes))))) - ;; and now create the tables within that schema - (create-tables tables :include-drop include-drop)))))) + (setf idx-kernel (when (and max-indexes (< 0 max-indexes)) + (make-kernel max-indexes))) - (truncate - (let ((qualified-table-name-list - (qualified-table-name-list all-columns))) - (truncate-tables (target-db mssql) - ;; here we really do want only the name - (mapcar #'car qualified-table-name-list))))) + (setf idx-channel (when idx-kernel + (let ((lp:*kernel* idx-kernel)) + (lp:make-channel))))) - ;; Transfert the data - (loop :for (schema . tables) :in all-columns - :do (loop :for (table-name . columns) :in tables - :do - (let ((table-source - (make-instance 'copy-mssql - :source-db (source-db mssql) - :target-db (target-db mssql) - :source (cons schema table-name) - :target (qualify-name schema table-name) - :fields columns))) - (log-message :debug "TARGET: ~a" (target table-source)) - (log-message :log "target: ~s" table-source) - ;; COPY the data to PostgreSQL, using copy-kernel - (unless schema-only - (copy-from table-source :kernel copy-kernel))))) + ;; if asked, first drop/create the tables on the PostgreSQL side + (handler-case + (cond ((and (or create-tables schema-only) (not data-only)) + (log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop) + (with-stats-collection ("create, truncate" + :state state-before + :summary summary) + (with-pgsql-transaction () + (loop :for (schema . tables) :in all-columns + :do (let ((schema (apply-identifier-case schema))) + ;; create schema + (let ((sql (format nil "CREATE SCHEMA ~a;" schema))) + (log-message :notice "~a" sql) + (pgsql-execute sql)) + + ;; set search_path to only that schema + (pgsql-execute + (format nil "SET LOCAL search_path TO ~a;" schema)) + + ;; and now create the tables within that schema + (create-tables tables :include-drop include-drop)))))) + + (truncate + (let ((qualified-table-name-list + (qualified-table-name-list all-columns))) + (truncate-tables (target-db mssql) + ;; here we really do want only the name + (mapcar #'car qualified-table-name-list))))) + + ;; + ;; In case some error happens in the preparatory transaction, we + ;; need to stop now and refrain from trying to load the data into an + ;; incomplete schema. + ;; + (cl-postgres:database-error (e) + (declare (ignore e)) ; a log has already been printed + (log-message :fatal "Failed to create the schema, see above.") + + (return-from copy-database))) + + ;; Transfert the data + (loop :for (schema . tables) :in all-columns + :do (loop :for (table-name . columns) :in tables + :do + (let ((table-source + (make-instance 'copy-mssql + :source-db (source-db mssql) + :target-db (target-db mssql) + :source (cons schema table-name) + :target (qualify-name schema table-name) + :fields columns))) + (log-message :debug "TARGET: ~a" (target table-source)) + (log-message :log "target: ~s" table-source) + ;; COPY the data to PostgreSQL, using copy-kernel + (unless schema-only + (copy-from table-source :kernel copy-kernel)) + + ;; Create the indexes for that table in parallel with the next + ;; COPY, and all at once in concurrent threads to benefit from + ;; PostgreSQL synchronous scan ability + ;; + ;; We just push new index build as they come along, if one + ;; index build requires much more time than the others our + ;; index build might get unsync: indexes for different tables + ;; will get built in parallel --- not a big problem. + (when (and create-indexes (not data-only)) + (let* ((s-entry (assoc schema all-indexes :test 'equal)) + (indexes-with-names + (cdr (assoc table-name (cdr s-entry) :test 'equal)))) + (create-indexes-in-kernel (target-db mssql) + (mapcar #'cdr indexes-with-names) + idx-kernel + idx-channel + :state idx-state)))))) ;; now end the kernels - (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* copy-kernel)) (lp:end-kernel)) + (let ((lp:*kernel* idx-kernel)) + ;; wait until the indexes are done being built... + ;; don't forget accounting for that waiting time. + (when (and create-indexes (not data-only)) + (with-stats-collection ("Index Build Completion" :state *state*) + (loop for idx in all-indexes do (lp:receive-result idx-channel)))) + (lp:end-kernel)) - ;; and report the total time spent on the operation - (when summary - (report-full-summary "Total streaming time" *state* - :before state-before - :finally state-after - :parallel idx-state)))) + ;; and report the total time spent on the operation + (when summary + (report-full-summary "Total streaming time" *state* + :before state-before + :finally state-after + :parallel idx-state))))) diff --git a/src/sources/mssql/mssql-schema.lisp b/src/sources/mssql/mssql-schema.lisp index e44dbd4..c789fdf 100644 --- a/src/sources/mssql/mssql-schema.lisp +++ b/src/sources/mssql/mssql-schema.lisp @@ -133,18 +133,15 @@ order by table_schema, table_name, ordinal_position" (reverse (loop :for (table-name . cols) :in tables :collect (cons table-name (reverse cols)))))))))) -(defun list-all-indexes (&key - (dbname *my-dbname*)) +(defun list-all-indexes () "Get the list of MSSQL index definitions per table." (loop :with result := nil - :for (schema table-name name col unique pkey) + :for (schema table name col unique pkey) :in (mssql-query (format nil " - select schema_name(schema_id) as SchemaName + select schema_name(schema_id) as SchemaName, o.name as TableName, i.name as IndexName, - ic.key_ordinal as ColumnOrder, - ic.is_included_column as IsIncluded, co.[name] as ColumnName, i.is_unique, i.is_primary_key @@ -156,30 +153,53 @@ order by table_schema, table_name, ordinal_position" join sys.columns co on co.object_id = i.object_id and co.column_id = ic.column_id + where schema_name(schema_id) not in ('dto', 'sys') + order by SchemaName, - o.[name], - i.[name], - ic.is_included_column, - ic.key_ordinal")) + o.[name], + i.[name], + ic.is_included_column, + ic.key_ordinal")) :do (let* ((s-entry (assoc schema result :test 'equal)) (t-entry (when s-entry - (assoc table-name (cdr t-entry) :test 'equal))) - (index (when t-entry - (assoc name (cdr t-entry) :test 'equal)))) + (assoc table (cdr s-entry) :test 'equal))) + (i-entry (when t-entry + (assoc name (cdr t-entry) :test 'equal))) + (index (make-pgsql-index :name name + :primary (= pkey 1) + :table-name (qualify-name schema table) + :unique (= unique 1) + :columns (list col)))) (if s-entry (if t-entry - (push column (cdr t-entry)) - (push (cons table-name (list column)) (cdr s-entry))) - (push (cons schema (list (cons table-name (list column)))) result))) + (if i-entry + (push col + (pgloader.pgsql::pgsql-index-columns (cdr i-entry))) + (push (cons name index) (cdr t-entry))) + (push (cons table (list (cons name index))) (cdr s-entry))) + (push (cons schema + (list (cons table + (list (cons name index))))) result))) :finally ;; we did push, we need to reverse here - (return (reverse - (loop :for (schema . tables) :in result - :collect - (cons schema - (reverse (loop :for (table-name . cols) :in tables - :collect (cons table-name (reverse cols)))))))))) + (return + (labels ((reverse-index-cols (index) + (setf (pgloader.pgsql::pgsql-index-columns index) + (nreverse (pgloader.pgsql::pgsql-index-columns index))) + index) + + (reverse-indexes-cols (list-of-indexes) + (loop :for (name . index) :in list-of-indexes + :collect (cons name (reverse-index-cols index)))) + + (reverse-indexes-cols (list-of-tables) + (reverse + (loop :for (table . indexes) :in list-of-tables + :collect (cons table (reverse-indexes-cols indexes)))))) + (reverse + (loop :for (schema . tables) :in result + :collect (cons schema (reverse-indexes-cols tables)))))))) ;;;