Fetch all MySQL meta-data from a single connection.

This commit is contained in:
Dimitri Fontaine 2013-12-20 10:21:31 +01:00
parent e09eb3fbb2
commit bb561a0c4d
2 changed files with 144 additions and 110 deletions

View File

@ -47,6 +47,17 @@
:identifier-case identifier-case)))))
;;;
;;; General utility to manage MySQL connection
;;;
(defun mysql-query (query &key row-fn (as-text t) (result-type 'list))
"Execute given QUERY within the current *connection*, and set proper
defaults for pgloader."
(qmynd:mysql-query *connection* query
:row-fn row-fn
:as-text as-text
:result-type result-type))
(defmacro with-mysql-connection ((&optional dbname) &body forms)
"Connect to MySQL, use given DBNAME as the current database if provided,
and execute FORMS in a protected way so that we always disconnect when
@ -61,17 +72,14 @@
:password *myconn-pass*
,@(when dbname
(list :database dbname)))))
(flet ((mysql-query (query &key row-fn (as-text t) (result-type 'list))
(qmynd:mysql-query *connection* query
:row-fn row-fn
:as-text as-text
:result-type result-type)))
(unwind-protect
(progn ,@forms)
(qmynd:mysql-disconnect *connection*)))))
(unwind-protect
(progn ,@forms)
(qmynd:mysql-disconnect *connection*))))
;;;
;;; Function for accessing the MySQL catalogs, implementing auto-discovery
;;; Function for accessing the MySQL catalogs, implementing auto-discovery.
;;;
;;; Interactive use only, will create its own database connection.
;;;
(defun list-databases ()
"Connect to a local database and get the database list"
@ -97,32 +105,41 @@ order by table_name" dbname))))
~@[and table_name in (~{'~a'~^,~})~]
order by table_name" dbname only-tables))))
;;;
;;; Those functions are to be called from withing an already established
;;; MySQL Connection.
;;;
;;; Handle MATERIALIZE VIEWS sections, where we need to create the views in
;;; the MySQL database before being able to process them.
;;;
(defun create-my-views (dbname views-alist)
"VIEWS-ALIST associates view names with their SQL definition, which might
be empty for already existing views. Create only the views for which we
have an SQL definition."
(let ((views (remove-if #'null views-alist :key #'cdr)))
(when views
(with-mysql-connection (dbname)
(loop for (name . def) in views
for sql = (format nil "CREATE VIEW ~a AS ~a" name def)
do
(log-message :info "MySQL: ~a" sql)
(mysql-query sql))))))
(loop for (name . def) in views
for sql = (format nil "CREATE VIEW ~a AS ~a" name def)
do
(log-message :info "MySQL: ~a" sql)
(mysql-query sql)))))
(defun drop-my-views (dbname views-alist)
"See `create-my-views' for VIEWS-ALIST description. This time we DROP the
views to clean out after our work."
(let ((views (remove-if #'null views-alist :key #'cdr)))
(when views
(with-mysql-connection (dbname)
(let ((sql
(format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views))))
(log-message :info "MySQL: ~a" sql)
(mysql-query sql))))))
(let ((sql
(format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views))))
(log-message :info "MySQL: ~a" sql)
(mysql-query sql)))))
;;;
;;; Those functions are to be called from withing an already established
;;; MySQL Connection.
;;;
;;; Tools to get MySQL table and columns definitions and transform them to
;;; PostgreSQL CREATE TABLE statements, and run those.
;;;
@ -138,12 +155,11 @@ order by table_name" dbname only-tables))))
&aux
(table-type-name (cdr (assoc table-type *table-type*))))
"Get the list of MySQL column names per table."
(with-mysql-connection (dbname)
(loop
with schema = nil
for (table-name name dtype ctype default nullable extra)
in
(mysql-query (format nil "
(loop
with schema = nil
for (table-name name dtype ctype default nullable extra)
in
(mysql-query (format nil "
select c.table_name, c.column_name,
c.data_type, c.column_type, c.column_default,
c.is_nullable, c.extra
@ -152,47 +168,46 @@ order by table_name" dbname only-tables))))
where c.table_schema = '~a' and t.table_type = '~a'
~@[and table_name in (~{'~a'~^,~})~]
order by table_name, ordinal_position" dbname table-type-name only-tables))
do
(let ((entry (assoc table-name schema :test 'equal))
(column
(make-mysql-column
table-name name dtype ctype default nullable extra)))
(if entry
(push column (cdr entry))
(push (cons table-name (list column)) schema)))
finally
;; we did push, we need to reverse here
(return (loop
for (name . cols) in schema
collect (cons name (reverse cols)))))))
do
(let ((entry (assoc table-name schema :test 'equal))
(column
(make-mysql-column
table-name name dtype ctype default nullable extra)))
(if entry
(push column (cdr entry))
(push (cons table-name (list column)) schema)))
finally
;; we did push, we need to reverse here
(return (loop
for (name . cols) in schema
collect (cons name (reverse cols))))))
(defun list-all-indexes (dbname)
"Get the list of MySQL index definitions per table."
(with-mysql-connection (dbname)
(loop
with schema = nil
for (table-name name non-unique cols)
in (mysql-query (format nil "
(loop
with schema = nil
for (table-name name non-unique cols)
in (mysql-query (format nil "
SELECT table_name, index_name, non_unique,
cast(GROUP_CONCAT(column_name order by seq_in_index) as char)
FROM information_schema.statistics
WHERE table_schema = '~a'
GROUP BY table_name, index_name;" dbname))
do (let ((entry (assoc table-name schema :test 'equal))
(index
(make-pgsql-index :name name
:primary (string= name "PRIMARY")
:table-name table-name
:unique (not (string= "1" non-unique))
:columns (sq:split-sequence #\, cols))))
(if entry
(push index (cdr entry))
(push (cons table-name (list index)) schema)))
finally
;; we did push, we need to reverse here
(return (reverse (loop
for (name . indexes) in schema
collect (cons name (reverse indexes))))))))
do (let ((entry (assoc table-name schema :test 'equal))
(index
(make-pgsql-index :name name
:primary (string= name "PRIMARY")
:table-name table-name
:unique (not (string= "1" non-unique))
:columns (sq:split-sequence #\, cols))))
(if entry
(push index (cdr entry))
(push (cons table-name (list index)) schema)))
finally
;; we did push, we need to reverse here
(return (reverse (loop
for (name . indexes) in schema
collect (cons name (reverse indexes)))))))
(defun set-table-oids (all-indexes &key identifier-case)
"MySQL allows using the same index name against separate tables, which
@ -212,17 +227,15 @@ GROUP BY table_name, index_name;" dbname))
do (loop for index in indexes
do (setf (pgloader.pgsql::pgsql-index-table-oid index) table-oid)))))
;;;
;;; MySQL Foreign Keys
;;;
(defun list-all-fkeys (dbname)
"Get the list of MySQL Foreign Keys definitions per table."
(with-mysql-connection (dbname)
(loop
with schema = nil
for (table-name name ftable cols fcols)
in (mysql-query (format nil "
(loop
with schema = nil
for (table-name name ftable cols fcols)
in (mysql-query (format nil "
SELECT i.table_name, i.constraint_name, k.referenced_table_name ft,
group_concat( k.column_name
@ -240,20 +253,20 @@ GROUP BY table_name, index_name;" dbname))
AND i.constraint_type = 'FOREIGN KEY'
GROUP BY table_name, constraint_name, ft;" dbname dbname))
do (let ((entry (assoc table-name schema :test 'equal))
(fk (make-pgsql-fkey :name name
:table-name table-name
:columns cols
:foreign-table ftable
:foreign-columns fcols)))
(if entry
(push fk (cdr entry))
(push (cons table-name (list fk)) schema)))
finally
;; we did push, we need to reverse here
(return (reverse (loop
for (name . fks) in schema
collect (cons name (reverse fks))))))))
do (let ((entry (assoc table-name schema :test 'equal))
(fk (make-pgsql-fkey :name name
:table-name table-name
:columns cols
:foreign-table ftable
:foreign-columns fcols)))
(if entry
(push fk (cdr entry))
(push (cons table-name (list fk)) schema)))
finally
;; we did push, we need to reverse here
(return (reverse (loop
for (name . fks) in schema
collect (cons name (reverse fks)))))))
(defun drop-fkeys (all-fkeys &key dbname identifier-case)
"Drop all Foreign Key Definitions given, to prepare for a clean run."

View File

@ -163,26 +163,51 @@
(dbname (source-db mysql))
(pg-dbname (target-db mysql))
(view-names (mapcar #'car materialize-views))
view-columns ; must wait until we created the views
(all-columns (filter-column-list (list-all-columns dbname)
:only-tables only-tables
:including including
:excluding excluding))
(all-fkeys (filter-column-list (list-all-fkeys dbname)
:only-tables only-tables
:including including
:excluding excluding))
(all-indexes (filter-column-list (list-all-indexes dbname)
:only-tables only-tables
:including including
:excluding excluding))
(max-indexes (loop for (table . indexes) in all-indexes
maximizing (length indexes)))
(idx-kernel (when (and max-indexes (< 0 max-indexes))
(make-kernel max-indexes)))
(idx-channel (when idx-kernel
(let ((lp:*kernel* idx-kernel))
(lp:make-channel)))))
;; all to be set within a single MySQL transaction
view-columns all-columns all-fkeys all-indexes
;; those depend on the previous entries
idx-kernel idx-channel)
;; to prepare the run, we need to fetch MySQL meta-data
(with-stats-collection (pg-dbname "fetch meta data" :state state-before)
(with-mysql-connection (dbname)
;; If asked to materialize views, now is the time to create
;; the target tables for them
(when materialize-views
(create-my-views dbname materialize-views))
(setf all-columns (filter-column-list (list-all-columns dbname)
:only-tables only-tables
:including including
:excluding excluding)
all-fkeys (filter-column-list (list-all-fkeys dbname)
:only-tables only-tables
:including including
:excluding excluding)
all-indexes (filter-column-list (list-all-indexes dbname)
:only-tables only-tables
:including including
:excluding excluding)
view-columns (list-all-columns dbname
:only-tables view-names
:table-type :view))))
;; prepare our lparallel kernels, dimensioning them to the known sizes
(let ((max-indexes
(loop for (table . indexes) in all-indexes
maximizing (length indexes))))
(setf idx-kernel (when (and max-indexes (< 0 max-indexes))
(make-kernel max-indexes)))
(setf idx-channel (when idx-kernel
(let ((lp:*kernel* idx-kernel))
(lp:make-channel)))))
;; if asked, first drop/create the tables on the PostgreSQL side
(when (and (or create-tables schema-only) (not data-only))
@ -215,16 +240,11 @@
(set-table-oids all-indexes
:identifier-case identifier-case)
;; If asked to materialize views, now is the time to create
;; the target tables for them
(when materialize-views
(create-my-views dbname materialize-views)
(setf view-columns (list-all-columns dbname
:only-tables view-names
:table-type :view))
(create-tables view-columns
:identifier-case identifier-case
:include-drop include-drop)))
;; We might have to MATERIALIZE VIEWS
(when materialize-views
(create-tables view-columns
:identifier-case identifier-case
:include-drop include-drop)))
;;
;; In case some error happens in the preparatory transaction, we
@ -286,7 +306,8 @@
;; If we created some views for this run, now is the time to DROP'em
;;
(when materialize-views
(drop-my-views dbname materialize-views))
(with-mysql-connection (dbname)
(drop-my-views dbname materialize-views)))
;;
;; Now Reset Sequences, the good time to do that is once the whole data
;; has been imported and once we have the indexes in place, as max() is