diff --git a/Makefile b/Makefile index 64ddd1b..f8fd730 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,9 @@ docs: pandoc pgloader.1.md -o pgloader.html pandoc pgloader.1.md -o pgloader.pdf +~/quicklisp/local-projects/qmynd: + git clone -b streaming https://github.com/dimitri/qmynd.git $@ + ~/quicklisp/local-projects/cl-abnf: git clone https://github.com/dimitri/cl-abnf.git $@ @@ -23,6 +26,7 @@ docs: ~/quicklisp/local-projects/cl-csv: git clone -b empty-strings-and-nil https://github.com/dimitri/cl-csv.git $@ +qmynd: ~/quicklisp/local-projects/qmynd ; cl-abnf: ~/quicklisp/local-projects/cl-abnf ; cl-csv: ~/quicklisp/local-projects/cl-csv ; postmodern: ~/quicklisp/local-projects/Postmodern ; @@ -41,7 +45,7 @@ $(ASDF_CONF): asdf-config: $(ASDF_CONF) ; -$(LIBS): quicklisp $(ASDF_CONF) cl-abnf postmodern cl-csv +$(LIBS): quicklisp $(ASDF_CONF) cl-abnf postmodern cl-csv qmynd sbcl --load ~/quicklisp/setup.lisp \ --eval '(ql:quickload "pgloader")' \ --eval '(quit)' diff --git a/README.md b/README.md index e2be50c..1327795 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ pgloader is now a Common Lisp program, tested using the [Quicklisp](http://www.quicklisp.org/beta/). $ apt-get install sbcl - $ apt-get install libmysqlclient-dev libsqlite3-dev + $ apt-get install libsqlite3-dev $ make pgloader $ ./build/pgloader.exe --help @@ -113,11 +113,6 @@ file into a manual page or an HTML page thanks to the `pandoc` application: Some notes about what I intend to be working on next. -### tests - - - add needed pre-requisites in bootstrap.sh to run the MySQL and SQLite - tests from the `make test` target without errors - ### binary distribution - prepare an all-included binary for several platforms diff --git a/pgloader.asd b/pgloader.asd index 76e6010..0f5f569 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -10,7 +10,7 @@ #:postmodern ; PostgreSQL protocol implementation #:cl-postgres ; low level bits for COPY streaming #:simple-date ; FIXME: recheck dependency - #:cl-mysql ; CFFI binding to libmysqlclient-dev + #:qmynd ; MySQL protocol implemenation #:split-sequence ; some parsing is made easy #:cl-csv ; full CSV reader #:cl-fad ; file and directories diff --git a/src/sources/mysql-schema.lisp b/src/sources/mysql-schema.lisp index 3e68131..6805e20 100644 --- a/src/sources/mysql-schema.lisp +++ b/src/sources/mysql-schema.lisp @@ -4,6 +4,8 @@ (in-package pgloader.mysql) +(defvar *connection* nil "Current MySQL connection") + ;;; ;;; Specific implementation of schema migration, see the API in @@ -45,103 +47,79 @@ :identifier-case identifier-case))))) +(defmacro with-mysql-connection ((&optional dbname) &body forms) + "Connect to MySQL, use given DBNAME as the current database if provided, + and execute FORMS in a protected way so that we always disconnect when + done. + + Connection parameters are *myconn-host*, *myconn-port*, *myconn-user* and + *myconn-pass*." + `(let ((*connection* + (qmynd:mysql-connect :host *myconn-host* + :port *myconn-port* + :username *myconn-user* + :password *myconn-pass* + ,@(when dbname + (list :database dbname))))) + (flet ((mysql-query (query &key row-fn (as-text t) (result-type 'list)) + (qmynd:mysql-query *connection* query + :row-fn row-fn + :as-text as-text + :result-type result-type))) + (unwind-protect + (progn ,@forms) + (qmynd:mysql-disconnect *connection*))))) + ;;; ;;; Function for accessing the MySQL catalogs, implementing auto-discovery ;;; -(defun list-databases (&key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun list-databases () "Connect to a local database and get the database list" - (cl-mysql:connect :host host :user user :password pass) - (unwind-protect - (mapcan #'identity (caar (cl-mysql:query "show databases"))) - (cl-mysql:disconnect))) + (with-mysql-connection () + (mysql-query "show databases"))) -(defun list-tables (dbname - &key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun list-tables (dbname) "Return a flat list of all the tables names known in given DATABASE" - (cl-mysql:connect :host host :user user :password pass) - - (unwind-protect - (progn - (cl-mysql:use dbname) - ;; that returns a pretty weird format, process it - (loop for (table-name) - in (caar (cl-mysql:query (format nil " + (with-mysql-connection (dbname) + (mysql-query (format nil " select table_name from information_schema.tables where table_schema = '~a' and table_type = 'BASE TABLE' -order by table_name" dbname))) - collect table-name)) - ;; free resources - (cl-mysql:disconnect))) +order by table_name" dbname)))) -(defun list-views (dbname - &key - only-tables - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun list-views (dbname &key only-tables) "Return a flat list of all the view names and definitions known in given DBNAME" - (cl-mysql:connect :host host :user user :password pass) - - (unwind-protect - (progn - (cl-mysql:use dbname) - ;; that returns a pretty weird format, process it - (caar (cl-mysql:query (format nil " + (with-mysql-connection (dbname) + (mysql-query (format nil " select table_name, view_definition from information_schema.views where table_schema = '~a' ~@[and table_name in (~{'~a'~^,~})~] order by table_name" dbname only-tables)))) - ;; free resources - (cl-mysql:disconnect))) -(defun create-my-views (dbname views-alist - &key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun create-my-views (dbname views-alist) "VIEWS-ALIST associates view names with their SQL definition, which might be empty for already existing views. Create only the views for which we have an SQL definition." (let ((views (remove-if #'null views-alist :key #'cdr))) (when views - (cl-mysql:connect :host host :user user :password pass) - (unwind-protect - (progn - (cl-mysql:use dbname) - (loop for (name . def) in views - for sql = (format nil "CREATE VIEW ~a AS ~a" name def) - do - (log-message :info "MySQL: ~a" sql) - (cl-mysql:query sql))) - ;; free resources - (cl-mysql:disconnect))))) + (with-mysql-connection (dbname) + (loop for (name . def) in views + for sql = (format nil "CREATE VIEW ~a AS ~a" name def) + do + (log-message :info "MySQL: ~a" sql) + (mysql-query sql)))))) -(defun drop-my-views (dbname views-alist - &key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun drop-my-views (dbname views-alist) "See `create-my-views' for VIEWS-ALIST description. This time we DROP the views to clean out after our work." (let ((views (remove-if #'null views-alist :key #'cdr))) (when views - (cl-mysql:connect :host host :user user :password pass) - (unwind-protect - (let ((sql - (format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views)))) - (cl-mysql:use dbname) - (log-message :info "MySQL: ~a" sql) - (cl-mysql:query sql)) - ;; free resources - (cl-mysql:disconnect))))) + (with-mysql-connection (dbname) + (let ((sql + (format nil "DROP VIEW ~{~a~^, ~};" (mapcar #'car views)))) + (log-message :info "MySQL: ~a" sql) + (mysql-query sql)))))) ;;; @@ -156,22 +134,16 @@ order by table_name" dbname only-tables)))) (defun list-all-columns (dbname &key only-tables - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*) (table-type :table) &aux (table-type-name (cdr (assoc table-type *table-type*)))) "Get the list of MySQL column names per table." - (cl-mysql:connect :host host :user user :password pass) - - (unwind-protect - (progn - (loop - with schema = nil - for (table-name name dtype ctype default nullable extra) - in - (caar (cl-mysql:query (format nil " + (with-mysql-connection (dbname) + (loop + with schema = nil + for (table-name name dtype ctype default nullable extra) + in + (mysql-query (format nil " select c.table_name, c.column_name, c.data_type, c.column_type, c.column_default, c.is_nullable, c.extra @@ -179,61 +151,48 @@ order by table_name" dbname only-tables)))) join information_schema.tables t using(table_schema, table_name) where c.table_schema = '~a' and t.table_type = '~a' ~@[and table_name in (~{'~a'~^,~})~] -order by table_name, ordinal_position" dbname table-type-name only-tables))) - do - (let ((entry (assoc table-name schema :test 'equal)) - (column - (make-mysql-column - table-name name dtype ctype default nullable extra))) - (if entry - (push column (cdr entry)) - (push (cons table-name (list column)) schema))) - finally - ;; we did push, we need to reverse here - (return (loop - for (name . cols) in schema - collect (cons name (reverse cols)))))) +order by table_name, ordinal_position" dbname table-type-name only-tables)) + do + (let ((entry (assoc table-name schema :test 'equal)) + (column + (make-mysql-column + table-name name dtype ctype default nullable extra))) + (if entry + (push column (cdr entry)) + (push (cons table-name (list column)) schema))) + finally + ;; we did push, we need to reverse here + (return (loop + for (name . cols) in schema + collect (cons name (reverse cols))))))) - ;; free resources - (cl-mysql:disconnect))) - -(defun list-all-indexes (dbname - &key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun list-all-indexes (dbname) "Get the list of MySQL index definitions per table." - (cl-mysql:connect :host host :user user :password pass) - - (unwind-protect - (progn - (loop - with schema = nil - for (table-name name non-unique cols) - in (caar (cl-mysql:query (format nil " + (with-mysql-connection (dbname) + (loop + with schema = nil + for (table-name name non-unique cols) + in (mysql-query (format nil " SELECT table_name, index_name, non_unique, - GROUP_CONCAT(column_name order by seq_in_index) + cast(GROUP_CONCAT(column_name order by seq_in_index) as char) FROM information_schema.statistics WHERE table_schema = '~a' -GROUP BY table_name, index_name;" dbname))) - do (let ((entry (assoc table-name schema :test 'equal)) - (index - (make-pgsql-index :name name - :primary (string= name "PRIMARY") - :table-name table-name - :unique (not (= 1 non-unique)) - :columns (sq:split-sequence #\, cols)))) - (if entry - (push index (cdr entry)) - (push (cons table-name (list index)) schema))) - finally - ;; we did push, we need to reverse here - (return (reverse (loop - for (name . indexes) in schema - collect (cons name (reverse indexes))))))) - - ;; free resources - (cl-mysql:disconnect))) +GROUP BY table_name, index_name;" dbname)) + do (let ((entry (assoc table-name schema :test 'equal)) + (index + (make-pgsql-index :name name + :primary (string= name "PRIMARY") + :table-name table-name + :unique (not (string= "1" non-unique)) + :columns (sq:split-sequence #\, cols)))) + (if entry + (push index (cdr entry)) + (push (cons table-name (list index)) schema))) + finally + ;; we did push, we need to reverse here + (return (reverse (loop + for (name . indexes) in schema + collect (cons name (reverse indexes)))))))) (defun set-table-oids (all-indexes) "MySQL allows using the same index name against separate tables, which @@ -254,20 +213,13 @@ GROUP BY table_name, index_name;" dbname))) ;;; ;;; MySQL Foreign Keys ;;; -(defun list-all-fkeys (dbname - &key - (host *myconn-host*) - (user *myconn-user*) - (pass *myconn-pass*)) +(defun list-all-fkeys (dbname) "Get the list of MySQL Foreign Keys definitions per table." - (cl-mysql:connect :host host :user user :password pass) - - (unwind-protect - (progn - (loop - with schema = nil - for (table-name name ftable cols fcols) - in (caar (cl-mysql:query (format nil " + (with-mysql-connection (dbname) + (loop + with schema = nil + for (table-name name ftable cols fcols) + in (mysql-query (format nil " SELECT i.table_name, i.constraint_name, k.referenced_table_name ft, group_concat( k.column_name @@ -284,24 +236,21 @@ GROUP BY table_name, index_name;" dbname))) AND k.referenced_table_schema = '~a' AND i.constraint_type = 'FOREIGN KEY' - GROUP BY table_name, constraint_name, ft;" dbname dbname))) - do (let ((entry (assoc table-name schema :test 'equal)) - (fk (make-pgsql-fkey :name name - :table-name table-name - :columns cols - :foreign-table ftable - :foreign-columns fcols))) - (if entry - (push fk (cdr entry)) - (push (cons table-name (list fk)) schema))) - finally - ;; we did push, we need to reverse here - (return (reverse (loop - for (name . fks) in schema - collect (cons name (reverse fks))))))) - - ;; free resources - (cl-mysql:disconnect))) + GROUP BY table_name, constraint_name, ft;" dbname dbname)) + do (let ((entry (assoc table-name schema :test 'equal)) + (fk (make-pgsql-fkey :name name + :table-name table-name + :columns cols + :foreign-table ftable + :foreign-columns fcols))) + (if entry + (push fk (cdr entry)) + (push (cons table-name (list fk)) schema))) + finally + ;; we did push, we need to reverse here + (return (reverse (loop + for (name . fks) in schema + collect (cons name (reverse fks)))))))) (defun drop-fkeys (all-fkeys &key dbname identifier-case) "Drop all Foreign Key Definitions given, to prepare for a clean run." @@ -366,11 +315,12 @@ GROUP BY table_name, index_name;" dbname))) This function assumes a valid connection to the MySQL server has been established already." (loop - for (name type) in (caar (cl-mysql:query (format nil " + for (name type) in (qmynd:mysql-query *connection* (format nil " select column_name, data_type from information_schema.columns where table_schema = '~a' and table_name = '~a' -order by ordinal_position" dbname table-name))) +order by ordinal_position" dbname table-name) + :result-type 'list) for is-null = (member type '("char" "varchar" "text" "tinytext" "mediumtext" "longtext") :test #'string-equal) diff --git a/src/sources/mysql.lisp b/src/sources/mysql.lisp index 6c8d3d3..e16f476 100644 --- a/src/sources/mysql.lisp +++ b/src/sources/mysql.lisp @@ -50,42 +50,23 @@ (let ((dbname (source-db mysql)) (table-name (source mysql))) - (cl-mysql:connect :host *myconn-host* - :port *myconn-port* - :user *myconn-user* - :password *myconn-pass*) + (with-mysql-connection (dbname) + (mysql-query "SET NAMES 'utf8'") + (mysql-query "SET character_set_results = utf8;") - (unwind-protect - (progn - ;; Ensure we're talking utf-8 and connect to DBNAME in MySQL - (cl-mysql:query "SET NAMES 'utf8'") - (cl-mysql:query "SET character_set_results = utf8;") - (cl-mysql:use dbname) - - (multiple-value-bind (cols nulls) - (get-column-list-with-is-nulls dbname table-name) - (let* ((sql (format nil "SELECT ~{~a~^, ~} FROM ~a;" cols table-name)) - (q (cl-mysql:query sql :store nil :type-map nil)) - (rs (cl-mysql:next-result-set q))) - (declare (ignore rs)) - - ;; Now fetch MySQL rows directly in the stream - (handler-case - (loop - with type-map = (make-hash-table) - for row = (cl-mysql:next-row q :type-map type-map) - while row - for row-with-proper-nulls = (fix-nulls row nulls) - counting row into count - do (funcall process-row-fn row-with-proper-nulls) - finally (return count)) - (cl-mysql-system:mysql-error (e) - (progn - (log-message :error "~a" e) ; begins with MySQL error: - (pgstate-setf *state* (target mysql) :errs -1))))))) - - ;; free resources - (cl-mysql:disconnect)))) + (multiple-value-bind (cols nulls) + (get-column-list-with-is-nulls dbname table-name) + (let* ((sql (format nil "SELECT ~{~a~^, ~} FROM ~a;" cols table-name)) + (row-fn + (lambda (row) + (pgstate-incf *state* (target mysql) :read 1) + (funcall process-row-fn (fix-nulls row nulls))))) + (handler-bind + ((babel-encodings:character-decoding-error + #'(lambda (e) + (pgstate-incf *state* (target mysql) :errs 1) + (log-message :error "~a" e)))) + (mysql-query sql :row-fn row-fn))))))) ;;; ;;; Use map-rows and pgsql-text-copy-format to fill in a CSV file on disk @@ -109,8 +90,7 @@ ;;; (defmethod copy-to-queue ((mysql copy-mysql) dataq) "Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ" - (let ((read (pgloader.queue:map-push-queue dataq #'map-rows mysql))) - (pgstate-incf *state* (target mysql) :read read))) + (pgloader.queue:map-push-queue dataq #'map-rows mysql)) ;;; @@ -246,7 +226,7 @@ ;; need to stop now and refrain to try loading the data into an ;; incomplete schema. ;; - (cl-mysql-system:mysql-error (e) + (qmynd:mysql-error (e) (log-message :fatal "~a" e) (return-from copy-database))