From bda06f8ac06929bd7c145b146d0c822b7287c157 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Mon, 17 Dec 2018 16:31:47 +0100 Subject: [PATCH] Implement Citus support from a MySQL database. --- src/load/migrate-database.lisp | 6 ++++- src/package.lisp | 14 +++++++++- src/parsers/command-mysql.lisp | 9 ++++--- src/pgsql/pgsql-ddl-citus.lisp | 8 +++--- src/sources/mysql/mysql-cast-rules.lisp | 3 +++ src/utils/catalog.lisp | 6 +++++ src/utils/citus.lisp | 34 ++++++++++++++++++++----- test/mysql/f1db-citus.load | 19 ++++++++++++++ 8 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 test/mysql/f1db-citus.load diff --git a/src/load/migrate-database.lisp b/src/load/migrate-database.lisp index 689894d..a9f1ba4 100644 --- a/src/load/migrate-database.lisp +++ b/src/load/migrate-database.lisp @@ -232,6 +232,7 @@ (defun process-catalog (copy catalog &key alter-table alter-schema distribute) "Do all the PostgreSQL catalog tweaking here: casts, index WHERE clause rewriting, pgloader level alter schema and alter table commands." + (log-message :info "Processing source catalogs") ;; cast the catalog into something PostgreSQL can work on (cast catalog) @@ -250,6 +251,7 @@ ;; we also support schema changes necessary for Citus distribution (when distribute + (log-message :info "Applying distribution rules") (setf (catalog-distribution-rules catalog) (citus-distribute-schema catalog distribute)))) @@ -366,10 +368,12 @@ :alter-schema alter-schema :distribute distribute) - (citus-rule-is-missing-from-list (e) + #+pgloader-image + ((or citus-rule-table-not-found citus-rule-is-missing-from-list) (e) (log-message :fatal "~a" e) (return-from copy-database)) + #+pgloader-image (condition (e) (log-message :fatal "Failed to process catalogs: ~a" e) (return-from copy-database))) diff --git a/src/package.lisp b/src/package.lisp index c3e76f5..c6eda82 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -190,6 +190,7 @@ #:count-indexes #:count-fkeys #:max-indexes-per-table + #:field-name #:push-to-end #:with-schema @@ -299,7 +300,18 @@ (:export #:citus-distribute-schema #:citus-format-sql-select #:citus-backfill-table-p - #:citus-rule-is-missing-from-list)) + #:citus-rule-table-not-found + #:citus-rule-is-missing-from-list + + #:citus-reference-rule + #:citus-reference-rule-p + #:citus-reference-rule-table + + #:citus-distributed-rule + #:citus-distributed-rule-p + #:citus-distributed-rule-table + #:citus-distributed-rule-using + #:citus-distributed-rule-from)) (defpackage #:pgloader.utils (:use #:cl diff --git a/src/parsers/command-mysql.lisp b/src/parsers/command-mysql.lisp index bbae776..0e87f66 100644 --- a/src/parsers/command-mysql.lisp +++ b/src/parsers/command-mysql.lisp @@ -89,7 +89,8 @@ excluding-matching decoding-tables-as before-load - after-load)) + after-load + distribute-commands)) (:lambda (clauses-list) (alexandria:alist-plist clauses-list))) @@ -164,7 +165,7 @@ &key gucs mysql-gucs casts views before after options - alter-table alter-schema + alter-table alter-schema distribute ((:including incl)) ((:excluding excl)) ((:decoding decoding-as)) @@ -191,6 +192,7 @@ :materialize-views ',views :alter-table ',alter-table :alter-schema ',alter-schema + :distribute ',distribute :set-table-oids t :on-error-stop on-error-stop ,@(remove-batch-control-option options)) @@ -203,7 +205,7 @@ pg-db-uri &key gucs mysql-gucs casts views before after options - alter-table alter-schema + alter-table alter-schema distribute including excluding decoding) source (cond (*dry-run* @@ -219,6 +221,7 @@ :options options :alter-table alter-table :alter-schema alter-schema + :distribute distribute :including including :excluding excluding :decoding decoding)))))) diff --git a/src/pgsql/pgsql-ddl-citus.lisp b/src/pgsql/pgsql-ddl-citus.lisp index d5cd050..2fe2028 100644 --- a/src/pgsql/pgsql-ddl-citus.lisp +++ b/src/pgsql/pgsql-ddl-citus.lisp @@ -13,6 +13,8 @@ (defmethod format-create-sql ((rule citus-distributed-rule) &key (stream nil) if-not-exists) (declare (ignore if-not-exists)) - (format stream "SELECT create_distributed_table('~a', '~a');" - (format-table-name (citus-distributed-rule-table rule)) - (column-name (citus-distributed-rule-using rule)))) + (let* ((rule-table (citus-distributed-rule-table rule)) + (rule-col-name (column-name (citus-distributed-rule-using rule)))) + (format stream "SELECT create_distributed_table('~a', '~a');" + (format-table-name rule-table) + (apply-identifier-case rule-col-name)))) diff --git a/src/sources/mysql/mysql-cast-rules.lisp b/src/sources/mysql/mysql-cast-rules.lisp index f05ecde..ba438fa 100644 --- a/src/sources/mysql/mysql-cast-rules.lisp +++ b/src/sources/mysql/mysql-cast-rules.lisp @@ -186,6 +186,9 @@ (table-name name comment dtype ctype default nullable extra))) table-name name dtype ctype default nullable extra comment) +(defmethod field-name ((field mysql-column) &key) + (mysql-column-name field)) + (defun explode-mysql-enum (ctype) "Convert MySQL ENUM expression into a list of labels." (cl-ppcre:register-groups-bind (list) diff --git a/src/utils/catalog.lisp b/src/utils/catalog.lisp index 8b0cd62..6c3ca3a 100644 --- a/src/utils/catalog.lisp +++ b/src/utils/catalog.lisp @@ -186,6 +186,9 @@ "Cast a FIELD definition from a source database into a PostgreSQL COLUMN definition.")) +(defgeneric field-name (object &key) + (:documentation "Get the source database column name, or field-name.")) + ;;; ;;; Implementation of the methods @@ -373,6 +376,9 @@ (loop :for schema :in (catalog-schema-list catalog) :do (cast schema))) +(defmethod field-name ((column column) &key) + (column-name column)) + ;;; ;;; There's no simple equivalent to array_agg() in MS SQL, so the index and ;;; fkey queries return a row per index|fkey column rather than per diff --git a/src/utils/citus.lisp b/src/utils/citus.lisp index 0bcf329..541e6e2 100644 --- a/src/utils/citus.lisp +++ b/src/utils/citus.lisp @@ -40,20 +40,42 @@ ;; ;; ERROR Database error 42P16: table ;; "campaigns" is already distributed ;; + ;; In the PostgreSQL source case, we have the table OIDs already at this + ;; point, but in the general case we don't. Use the names to match what + ;; we did up to now. + ;; (loop :for rule :in (append distribution-rules derived-rules) - :unless (member (table-oid (citus-rule-table rule)) + :unless (member (table-source-name (citus-rule-table rule)) processed-rules :key (lambda (rule) - (table-oid (citus-rule-table rule)))) + (table-source-name (citus-rule-table rule))) + :test #'equal) :collect (progn (push rule processed-rules) (apply-citus-rule rule) rule)))) +(define-condition citus-rule-table-not-found (error) + ((schema-name :initarg :schema-name + :accessor citus-rule-table-not-found-schema-name) + (table-name :initarg :table-name + :accessor citus-rule-table-not-found-table-name)) + (:report + (lambda (err stream) + (let ((*print-circle* nil)) + (with-slots (schema-name table-name) + err + (format stream + "Could not find table ~s in schema ~s for distribution rules." + table-name schema-name)))))) + (defun citus-find-table (catalog table) - (let* ((table-name (table-name table)) + (let* ((table-name (cdr (table-source-name table))) (schema-name (schema-name (table-schema table)))) - (find-table (find-schema catalog schema-name) table-name))) + (or (find-table (find-schema catalog schema-name) table-name) + (error (make-condition 'citus-rule-table-not-found + :table-name table-name + :schema-name schema-name))))) (defgeneric citus-rule-table (rule) (:documentation "Returns the RULE's table.") @@ -197,11 +219,11 @@ ;; it to our model (setf (table-citus-rule (citus-distributed-rule-table rule)) rule) - (let* ((table (citus-distributed-rule-table rule)) + (let* ((table (citus-distributed-rule-table rule)) (column (find (column-name (citus-distributed-rule-using rule)) (table-field-list table) :test #'string= - :key #'column-name))) + :key #'field-name))) (if column ;; add it to the PKEY definition, in first position diff --git a/test/mysql/f1db-citus.load b/test/mysql/f1db-citus.load new file mode 100644 index 0000000..840f415 --- /dev/null +++ b/test/mysql/f1db-citus.load @@ -0,0 +1,19 @@ +load database + from mysql://root@localhost/f1db?useSSL=false + into pgsql://localhost:9700/dim + + with reset no sequences + + distribute f1db.circuits as reference table + distribute f1db.constructorResults using raceId + distribute f1db.constructors as reference table + distribute f1db.constructorStandings using raceId + distribute f1db.drivers as reference table + distribute f1db.driverStandings using raceId + distribute f1db.lapTimes using raceId + distribute f1db.pitStops using raceId + distribute f1db.qualifying using raceId + distribute f1db.races as reference table + distribute f1db.results using raceId + distribute f1db.seasons as reference table + distribute f1db.status as reference table;