diff --git a/src/sources/mysql-schema.lisp b/src/sources/mysql-schema.lisp index 626d05d..8a12465 100644 --- a/src/sources/mysql-schema.lisp +++ b/src/sources/mysql-schema.lisp @@ -47,6 +47,17 @@ :identifier-case identifier-case))))) +;;; +;;; General utility to manage MySQL connection +;;; +(defun mysql-query (query &key row-fn (as-text t) (result-type 'list)) + "Execute given QUERY within the current *connection*, and set proper + defaults for pgloader." + (qmynd:mysql-query *connection* query + :row-fn row-fn + :as-text as-text + :result-type result-type)) + (defmacro with-mysql-connection ((&optional dbname) &body forms) "Connect to MySQL, use given DBNAME as the current database if provided, and execute FORMS in a protected way so that we always disconnect when @@ -61,17 +72,14 @@ :password *myconn-pass* ,@(when dbname (list :database dbname))))) - (flet ((mysql-query (query &key row-fn (as-text t) (result-type 'list)) - (qmynd:mysql-query *connection* query - :row-fn row-fn - :as-text as-text - :result-type result-type))) - (unwind-protect - (progn ,@forms) - (qmynd:mysql-disconnect *connection*))))) + (unwind-protect + (progn ,@forms) + (qmynd:mysql-disconnect *connection*)))) ;;; -;;; Function for accessing the MySQL catalogs, implementing auto-discovery +;;; Function for accessing the MySQL catalogs, implementing auto-discovery. +;;; +;;; Interactive use only, will create its own database connection. ;;; (defun list-databases () "Connect to a local database and get the database list" @@ -97,32 +105,41 @@ order by table_name" dbname)))) ~@[and table_name in (~{'~a'~^,~})~] order by table_name" dbname only-tables)))) + +;;; +;;; Those functions are to be called from withing an already established +;;; MySQL Connection. +;;; +;;; Handle MATERIALIZE VIEWS sections, where we need to create the views in +;;; the MySQL database before being able to process them. +;;; (defun create-my-views (dbname views-alist) "VIEWS-ALIST associates view names with their SQL definition, which might be empty for already existing views. Create only the views for which we have an SQL definition." (let ((views (remove-if #'null views-alist :key #'cdr))) (when views - (with-mysql-connection (dbname) - (loop for (name . def) in views - for sql = (format nil "CREATE VIEW ~a AS ~a" name def) - do - (log-message :info "MySQL: ~a" sql) - (mysql-query sql)))))) + (loop for (name . def) in views + for sql = (format nil "CREATE VIEW ~a AS ~a" name def) + do + (log-message :info "MySQL: ~a" sql) + (mysql-query sql))))) (defun drop-my-views (dbname views-alist) "See `create-my-views' for VIEWS-ALIST description. This time we DROP the views to clean out after our work." (let ((views (remove-if #'null views-alist :key #'cdr))) (when views - (with-mysql-connection (dbname) - (let ((sql - (format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views)))) - (log-message :info "MySQL: ~a" sql) - (mysql-query sql)))))) + (let ((sql + (format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views)))) + (log-message :info "MySQL: ~a" sql) + (mysql-query sql))))) ;;; +;;; Those functions are to be called from withing an already established +;;; MySQL Connection. +;;; ;;; Tools to get MySQL table and columns definitions and transform them to ;;; PostgreSQL CREATE TABLE statements, and run those. ;;; @@ -138,12 +155,11 @@ order by table_name" dbname only-tables)))) &aux (table-type-name (cdr (assoc table-type *table-type*)))) "Get the list of MySQL column names per table." - (with-mysql-connection (dbname) - (loop - with schema = nil - for (table-name name dtype ctype default nullable extra) - in - (mysql-query (format nil " + (loop + with schema = nil + for (table-name name dtype ctype default nullable extra) + in + (mysql-query (format nil " select c.table_name, c.column_name, c.data_type, c.column_type, c.column_default, c.is_nullable, c.extra @@ -152,47 +168,46 @@ order by table_name" dbname only-tables)))) where c.table_schema = '~a' and t.table_type = '~a' ~@[and table_name in (~{'~a'~^,~})~] order by table_name, ordinal_position" dbname table-type-name only-tables)) - do - (let ((entry (assoc table-name schema :test 'equal)) - (column - (make-mysql-column - table-name name dtype ctype default nullable extra))) - (if entry - (push column (cdr entry)) - (push (cons table-name (list column)) schema))) - finally - ;; we did push, we need to reverse here - (return (loop - for (name . cols) in schema - collect (cons name (reverse cols))))))) + do + (let ((entry (assoc table-name schema :test 'equal)) + (column + (make-mysql-column + table-name name dtype ctype default nullable extra))) + (if entry + (push column (cdr entry)) + (push (cons table-name (list column)) schema))) + finally + ;; we did push, we need to reverse here + (return (loop + for (name . cols) in schema + collect (cons name (reverse cols)))))) (defun list-all-indexes (dbname) "Get the list of MySQL index definitions per table." - (with-mysql-connection (dbname) - (loop - with schema = nil - for (table-name name non-unique cols) - in (mysql-query (format nil " + (loop + with schema = nil + for (table-name name non-unique cols) + in (mysql-query (format nil " SELECT table_name, index_name, non_unique, cast(GROUP_CONCAT(column_name order by seq_in_index) as char) FROM information_schema.statistics WHERE table_schema = '~a' GROUP BY table_name, index_name;" dbname)) - do (let ((entry (assoc table-name schema :test 'equal)) - (index - (make-pgsql-index :name name - :primary (string= name "PRIMARY") - :table-name table-name - :unique (not (string= "1" non-unique)) - :columns (sq:split-sequence #\, cols)))) - (if entry - (push index (cdr entry)) - (push (cons table-name (list index)) schema))) - finally - ;; we did push, we need to reverse here - (return (reverse (loop - for (name . indexes) in schema - collect (cons name (reverse indexes)))))))) + do (let ((entry (assoc table-name schema :test 'equal)) + (index + (make-pgsql-index :name name + :primary (string= name "PRIMARY") + :table-name table-name + :unique (not (string= "1" non-unique)) + :columns (sq:split-sequence #\, cols)))) + (if entry + (push index (cdr entry)) + (push (cons table-name (list index)) schema))) + finally + ;; we did push, we need to reverse here + (return (reverse (loop + for (name . indexes) in schema + collect (cons name (reverse indexes))))))) (defun set-table-oids (all-indexes &key identifier-case) "MySQL allows using the same index name against separate tables, which @@ -212,17 +227,15 @@ GROUP BY table_name, index_name;" dbname)) do (loop for index in indexes do (setf (pgloader.pgsql::pgsql-index-table-oid index) table-oid))))) - ;;; ;;; MySQL Foreign Keys ;;; (defun list-all-fkeys (dbname) "Get the list of MySQL Foreign Keys definitions per table." - (with-mysql-connection (dbname) - (loop - with schema = nil - for (table-name name ftable cols fcols) - in (mysql-query (format nil " + (loop + with schema = nil + for (table-name name ftable cols fcols) + in (mysql-query (format nil " SELECT i.table_name, i.constraint_name, k.referenced_table_name ft, group_concat( k.column_name @@ -240,20 +253,20 @@ GROUP BY table_name, index_name;" dbname)) AND i.constraint_type = 'FOREIGN KEY' GROUP BY table_name, constraint_name, ft;" dbname dbname)) - do (let ((entry (assoc table-name schema :test 'equal)) - (fk (make-pgsql-fkey :name name - :table-name table-name - :columns cols - :foreign-table ftable - :foreign-columns fcols))) - (if entry - (push fk (cdr entry)) - (push (cons table-name (list fk)) schema))) - finally - ;; we did push, we need to reverse here - (return (reverse (loop - for (name . fks) in schema - collect (cons name (reverse fks)))))))) + do (let ((entry (assoc table-name schema :test 'equal)) + (fk (make-pgsql-fkey :name name + :table-name table-name + :columns cols + :foreign-table ftable + :foreign-columns fcols))) + (if entry + (push fk (cdr entry)) + (push (cons table-name (list fk)) schema))) + finally + ;; we did push, we need to reverse here + (return (reverse (loop + for (name . fks) in schema + collect (cons name (reverse fks))))))) (defun drop-fkeys (all-fkeys &key dbname identifier-case) "Drop all Foreign Key Definitions given, to prepare for a clean run." diff --git a/src/sources/mysql.lisp b/src/sources/mysql.lisp index 787bd77..ca08cb8 100644 --- a/src/sources/mysql.lisp +++ b/src/sources/mysql.lisp @@ -163,26 +163,51 @@ (dbname (source-db mysql)) (pg-dbname (target-db mysql)) (view-names (mapcar #'car materialize-views)) - view-columns ; must wait until we created the views - (all-columns (filter-column-list (list-all-columns dbname) - :only-tables only-tables - :including including - :excluding excluding)) - (all-fkeys (filter-column-list (list-all-fkeys dbname) - :only-tables only-tables - :including including - :excluding excluding)) - (all-indexes (filter-column-list (list-all-indexes dbname) - :only-tables only-tables - :including including - :excluding excluding)) - (max-indexes (loop for (table . indexes) in all-indexes - maximizing (length indexes))) - (idx-kernel (when (and max-indexes (< 0 max-indexes)) - (make-kernel max-indexes))) - (idx-channel (when idx-kernel - (let ((lp:*kernel* idx-kernel)) - (lp:make-channel))))) + + ;; all to be set within a single MySQL transaction + view-columns all-columns all-fkeys all-indexes + + ;; those depend on the previous entries + idx-kernel idx-channel) + + ;; to prepare the run, we need to fetch MySQL meta-data + (with-stats-collection (pg-dbname "fetch meta data" :state state-before) + (with-mysql-connection (dbname) + ;; If asked to materialize views, now is the time to create + ;; the target tables for them + (when materialize-views + (create-my-views dbname materialize-views)) + + (setf all-columns (filter-column-list (list-all-columns dbname) + :only-tables only-tables + :including including + :excluding excluding) + + all-fkeys (filter-column-list (list-all-fkeys dbname) + :only-tables only-tables + :including including + :excluding excluding) + + all-indexes (filter-column-list (list-all-indexes dbname) + :only-tables only-tables + :including including + :excluding excluding) + + view-columns (list-all-columns dbname + :only-tables view-names + :table-type :view)))) + + ;; prepare our lparallel kernels, dimensioning them to the known sizes + (let ((max-indexes + (loop for (table . indexes) in all-indexes + maximizing (length indexes)))) + + (setf idx-kernel (when (and max-indexes (< 0 max-indexes)) + (make-kernel max-indexes))) + + (setf idx-channel (when idx-kernel + (let ((lp:*kernel* idx-kernel)) + (lp:make-channel))))) ;; if asked, first drop/create the tables on the PostgreSQL side (when (and (or create-tables schema-only) (not data-only)) @@ -215,16 +240,11 @@ (set-table-oids all-indexes :identifier-case identifier-case) - ;; If asked to materialize views, now is the time to create - ;; the target tables for them - (when materialize-views - (create-my-views dbname materialize-views) - (setf view-columns (list-all-columns dbname - :only-tables view-names - :table-type :view)) - (create-tables view-columns - :identifier-case identifier-case - :include-drop include-drop))) + ;; We might have to MATERIALIZE VIEWS + (when materialize-views + (create-tables view-columns + :identifier-case identifier-case + :include-drop include-drop))) ;; ;; In case some error happens in the preparatory transaction, we @@ -286,7 +306,8 @@ ;; If we created some views for this run, now is the time to DROP'em ;; (when materialize-views - (drop-my-views dbname materialize-views)) + (with-mysql-connection (dbname) + (drop-my-views dbname materialize-views))) ;; ;; Now Reset Sequences, the good time to do that is once the whole data ;; has been imported and once we have the indexes in place, as max() is