Implement support for MySQL Foreign Keys.

This commit is contained in:
Dimitri Fontaine 2013-11-05 18:48:54 +01:00
parent 5624d729c4
commit e53e613a82
5 changed files with 174 additions and 15 deletions

View File

@ -64,6 +64,9 @@
#:create-tables
#:format-pgsql-column
#:format-extra-type
#:make-pgsql-fkey
#:format-pgsql-create-fkey
#:format-pgsql-drop-fkey
#:drop-index-sql-list
#:create-index-sql-list))
@ -145,6 +148,9 @@
#:create-tables
#:format-pgsql-column
#:format-extra-type
#:make-pgsql-fkey
#:format-pgsql-create-fkey
#:format-pgsql-drop-fkey
#:drop-index-sql-list
#:create-index-sql-list)
(:export #:copy-mysql

View File

@ -725,7 +725,7 @@
(let* ((state-before ,(when before `(pgloader.utils:make-pgstate)))
(*state* (or *state* (pgloader.utils:make-pgstate)))
(state-idx (pgloader.utils:make-pgstate))
(state-after ,(when after `(pgloader.utils:make-pgstate)))
(state-after (pgloader.utils:make-pgstate))
(pgloader.mysql:*cast-rules* ',casts)
(*myconn-host* ,myhost)
(*myconn-port* ,myport)

View File

@ -53,6 +53,52 @@
(declare (ignorable identifier-case include-drop))
nil)
;;;
;;; API for Foreign Keys
;;;
(defstruct pgsql-fkey
name table-name columns foreign-table foreign-columns)
(defgeneric format-pgsql-create-fkey (fkey &key identifier-case)
(:documentation
"Return the PostgreSQL command to define a Foreign Key Constraint."))
(defgeneric format-pgsql-drop-fkey (fkey &key identifier-case)
(:documentation
"Return the PostgreSQL command to DROP a Foreign Key Constraint."))
(defmethod format-pgsql-create-fkey ((fk pgsql-fkey) &key identifier-case)
"Generate the PostgreSQL statement to rebuild a MySQL Foreign Key"
(let* ((constraint-name
(apply-identifier-case (pgsql-fkey-name fk) identifier-case))
(table-name
(apply-identifier-case (pgsql-fkey-table-name fk) identifier-case))
(foreign-table
(apply-identifier-case (pgsql-fkey-foreign-table fk) identifier-case)))
(format nil
"ALTER TABLE ~a ADD CONSTRAINT ~a FOREIGN KEY(~a) REFERENCES ~a(~a)"
table-name
constraint-name
(pgsql-fkey-columns fk)
foreign-table
(pgsql-fkey-foreign-columns fk))))
(defmethod format-pgsql-drop-fkey ((fk pgsql-fkey) &key identifier-case)
"Generate the PostgreSQL statement to rebuild a MySQL Foreign Key"
(let* ((constraint-name
(apply-identifier-case (pgsql-fkey-name fk) identifier-case))
(table-name
(apply-identifier-case (pgsql-fkey-table-name fk) identifier-case)))
;; beware, IF EXISTS is only supported from 9.0 onward...
(format nil "alter table ~a drop constraint if exists ~a"
table-name constraint-name)))
;;;
;;; Table schema rewriting support
;;;
(defun create-table-sql (table-name cols &key if-not-exists identifier-case)
"Return a PostgreSQL CREATE TABLE statement from given COLS.

View File

@ -179,6 +179,97 @@ GROUP BY table_name, index_name;" dbname)))
;; free resources
(cl-mysql:disconnect)))
;;;
;;; MySQL Foreign Keys
;;;
(defun list-all-fkeys (dbname
&key
(host *myconn-host*)
(user *myconn-user*)
(pass *myconn-pass*))
"Get the list of MySQL Foreign Keys definitions per table."
(cl-mysql:connect :host host :user user :password pass)
(unwind-protect
(progn
(loop
with schema = nil
for (table-name name ftable cols fcols)
in (caar (cl-mysql:query (format nil "
SELECT i.table_name, i.constraint_name, k.referenced_table_name ft,
group_concat( k.column_name
order by k.ordinal_position) as cols,
group_concat( k.referenced_column_name
order by k.position_in_unique_constraint) as fcols
FROM information_schema.table_constraints i
LEFT JOIN information_schema.key_column_usage k
USING (table_schema, table_name, constraint_name)
WHERE i.table_schema = '~a'
AND k.referenced_table_schema = '~a'
AND i.constraint_type = 'FOREIGN KEY'
GROUP BY table_name, constraint_name, ft;" dbname dbname)))
do (let ((entry (assoc table-name schema :test 'equal))
(fk (make-pgsql-fkey :name name
:table-name table-name
:columns cols
:foreign-table ftable
:foreign-columns fcols)))
(if entry
(push fk (cdr entry))
(push (cons table-name (list fk)) schema)))
finally
;; we did push, we need to reverse here
(return (reverse (loop
for (name . fks) in schema
collect (cons name (reverse fks)))))))
;; free resources
(cl-mysql:disconnect)))
(defun drop-fkeys (all-fkeys &key identifier-case)
"Drop all Foreign Key Definitions given, to prepare for a clean run."
(loop for (table-name . fkeys) in all-fkeys
do
(loop for fkey in fkeys
for sql = (format-pgsql-drop-fkey fkey :identifier-case identifier-case)
do (pgsql-execute sql))))
(defun create-fkeys (all-fkeys
&key
dbname state identifier-case (label "Foreign Keys"))
"Actually create the Foreign Key References that where declared in the
MySQL database"
(pgstate-add-table state dbname label)
(loop for (table-name . fkeys) in all-fkeys
do (loop for fkey in fkeys
for sql =
(format-pgsql-create-fkey fkey :identifier-case identifier-case)
do
(log-message :notice "~a" sql)
(pgsql-execute-with-timing dbname "Foreign Keys" sql state))))
;;;
;;; Sequences
;;;
(defun reset-sequences (all-columns &key dbname state identifier-case)
"Reset all sequences created during this MySQL migration."
(let ((tables
(mapcar
(lambda (name) (apply-identifier-case name identifier-case))
(mapcar #'car all-columns))))
(log-message :notice "Reset sequences")
(with-stats-collection (dbname "Reset Sequences"
:use-result-as-rows t
:state state)
(pgloader.pgsql:reset-all-sequences dbname :tables tables))))
;;;
;;; Tools to handle row queries, issuing separate is null statements and

View File

@ -149,7 +149,7 @@
(defun create-indexes-in-kernel (dbname table-name indexes kernel channel
&key
identifier-case include-drop
state (label "create index"))
state (label "Create Indexes"))
"Create indexes for given table in dbname, using given lparallel KERNEL
and CHANNEL so that the index build happen in concurrently with the data
copying."
@ -203,7 +203,7 @@
(let* ((summary (null *state*))
(*state* (or *state* (make-pgstate)))
(idx-state (or state-indexes (make-pgstate)))
(seq-state (or state-after (make-pgstate)))
(state-after (or state-after (make-pgstate)))
(copy-kernel (make-kernel 2))
(dbname (source-db mysql))
(pg-dbname (target-db mysql))
@ -211,6 +211,10 @@
:only-tables only-tables
:including including
:excluding excluding))
(all-fkeys (filter-column-list (list-all-fkeys dbname)
:only-tables only-tables
:including including
:excluding excluding))
(all-indexes (filter-column-list (list-all-indexes dbname)
:only-tables only-tables
:including including
@ -230,6 +234,12 @@
:use-result-as-rows t
:state state-before)
(with-pgsql-transaction (pg-dbname)
;; we need to first drop the Foreign Key Constraints, so that we
;; can DROP TABLE when asked
(when include-drop
(drop-fkeys all-fkeys :identifier-case identifier-case))
;; now drop then create tables and types, etc
(create-tables all-columns
:identifier-case identifier-case
:include-drop include-drop))))
@ -268,31 +278,37 @@
;; don't forget to reset sequences, but only when we did actually import
;; the data.
(when (and (not schema-only) reset-sequences)
(let ((tables
(mapcar
(lambda (name) (apply-identifier-case name identifier-case))
(or only-tables
(mapcar #'car all-columns)))))
(log-message :notice "Reset sequences")
(with-stats-collection (pg-dbname "reset sequences"
:use-result-as-rows t
:state seq-state)
(pgloader.pgsql:reset-all-sequences pg-dbname :tables tables))))
(reset-sequences all-columns
:dbname pg-dbname
:state state-after
:identifier-case identifier-case))
;; now end the kernels
(let ((lp:*kernel* copy-kernel)) (lp:end-kernel))
(let ((lp:*kernel* idx-kernel))
;; wait until the indexes are done being built...
;; don't forget accounting for that waiting time.
(with-stats-collection (pg-dbname "index build completion" :state *state*)
(with-stats-collection (pg-dbname "Index Build Completion" :state *state*)
(loop for idx in all-indexes do (lp:receive-result idx-channel)))
(lp:end-kernel))
;;
;; Foreign Key Constraints
;;
;; We need to have finished loading both the reference and the refering
;; tables to be able to build the foreign keys, so wait until all tables
;; and indexes are imported before doing that.
;;
(create-fkeys all-fkeys
:dbname pg-dbname
:state state-after
:identifier-case identifier-case)
;; and report the total time spent on the operation
(when summary
(report-full-summary "Total streaming time" *state*
:before state-before
:finally seq-state
:finally state-after
:parallel idx-state))))