diff --git a/pgloader.asd b/pgloader.asd index 12b1684..3d12ebd 100644 --- a/pgloader.asd +++ b/pgloader.asd @@ -69,6 +69,7 @@ (:file "quoting" :depends-on ("utils")) (:file "catalog" :depends-on ("quoting")) (:file "alter-table" :depends-on ("catalog")) + (:file "citus" :depends-on ("catalog")) ;; State, monitoring, reporting (:file "reject" :depends-on ("state")) @@ -95,6 +96,7 @@ :components ((:file "connection") (:file "pgsql-ddl") + (:file "pgsql-ddl-citus") (:file "pgsql-schema") (:file "merge-catalogs" :depends-on ("pgsql-schema")) (:file "pgsql-trigger") @@ -239,6 +241,7 @@ (:file "command-cast-rules") (:file "command-materialize-views") (:file "command-alter-table") + (:file "command-distribute") (:file "command-mysql") (:file "command-including-like") (:file "command-mssql") diff --git a/src/load/migrate-database.lisp b/src/load/migrate-database.lisp index 28f57c9..129ca5b 100644 --- a/src/load/migrate-database.lisp +++ b/src/load/migrate-database.lisp @@ -19,7 +19,8 @@ set-table-oids materialize-views foreign-keys - include-drop) + include-drop + distribute) "Prepare the target PostgreSQL database: create tables casting datatypes from the MySQL definitions, prepare index definitions and create target tables for materialized views. @@ -114,7 +115,15 @@ :use-result-as-rows t) (create-views catalog :include-drop include-drop - :client-min-messages :error)))) + :client-min-messages :error))) + + ;; Citus Support + (when distribute + (with-stats-collection ("Citus Distribute Tables" :section :pre) + (let ((citus-sql + (loop :for rule :in distribute + :collect (format-create-sql rule)))) + (pgsql-execute citus-sql :client-min-messages :notice))))) ;; log the catalog we just fetched and (maybe) merged (log-message :data "CATALOG: ~s" catalog)) @@ -213,9 +222,10 @@ :reset-sequences reset-sequences)))) -(defun process-catalog (copy catalog &key alter-table alter-schema) +(defun process-catalog (copy catalog &key alter-table alter-schema distribute) "Do all the PostgreSQL catalog tweaking here: casts, index WHERE clause rewriting, pgloader level alter schema and alter table commands." + ;; cast the catalog into something PostgreSQL can work on (cast catalog) @@ -229,7 +239,11 @@ ;; if asked, now alter the catalog with given rules: the alter-table ;; keyword parameter actually contains a set of alter table rules. (when alter-table - (alter-table catalog alter-table))) + (alter-table catalog alter-table)) + + ;; we also support schema changes necessary for Citus distribution + (when distribute + (pgloader.catalog::citus-distribute-schema catalog distribute))) ;;; @@ -256,6 +270,7 @@ (foreign-keys t) (reindex nil) (after-schema nil) + distribute only-tables including excluding @@ -326,7 +341,8 @@ ;; that's CAST rules, index WHERE clause rewriting and ALTER commands (process-catalog copy catalog :alter-table alter-table - :alter-schema alter-schema) + :alter-schema alter-schema + :distribute distribute) ;; if asked, first drop/create the tables on the PostgreSQL side (handler-case @@ -341,7 +357,8 @@ :include-drop include-drop :foreign-keys foreign-keys :set-table-oids set-table-oids - :materialize-views materialize-views) + :materialize-views materialize-views + :distribute distribute) ;; if there's an AFTER SCHEMA DO/EXECUTE command, now is the time ;; to run it. diff --git a/src/package.lisp b/src/package.lisp index bc9abfe..8d32d64 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -98,6 +98,7 @@ #:table-index-list #:table-fkey-list #:table-trigger-list + #:table-citus-rule #:extension-name #:extension-schema @@ -208,6 +209,15 @@ #:match-rule-action #:match-rule-args + #:citus-reference-table + #:citus-distributed-table + #:make-citus-reference-table + #:make-citus-distributed-table + #:citus-reference-table-table + #:citus-distributed-table-table + #:citus-distributed-table-using + #:citus-distributed-table-from + #:format-table-name)) (defpackage #:pgloader.state diff --git a/src/parsers/command-distribute.lisp b/src/parsers/command-distribute.lisp new file mode 100644 index 0000000..0a642b7 --- /dev/null +++ b/src/parsers/command-distribute.lisp @@ -0,0 +1,48 @@ +#| + distribute billers using id + distribute bills using biller_id + distribute receivable_accounts using biller_id + distribute payments using biller_id + + distribute splits using biller_id + from receivable_accounts + + distribute ach_accounts as reference table +|# + +(in-package :pgloader.parser) + +(defun create-table-from-dsn-table-name (dsn-table-name + &optional (schema-name "public")) + (let ((table (create-table (cdr (second dsn-table-name))))) + (unless (table-schema table) + (setf (table-schema table) + (make-schema :catalog nil + :source-name schema-name + :name (apply-identifier-case schema-name)))) + table)) + +(defrule distribute-reference (and kw-distribute dsn-table-name + kw-as kw-reference kw-table) + (:lambda (d-r) + (make-citus-reference-table :table (create-table-from-dsn-table-name d-r)))) + +(defrule distribute-using (and kw-distribute dsn-table-name + kw-using maybe-quoted-namestring) + (:lambda (d-u) + (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u) + :using (make-column :name (fourth d-u))))) + +(defrule distribute-using-from (and kw-distribute dsn-table-name + kw-using maybe-quoted-namestring + kw-from (+ maybe-quoted-namestring)) + (:lambda (d-u-f) + (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u-f) + :using (make-column :name (fourth d-u-f)) + :from (apply #'create-table (sixth d-u-f))))) + +(defrule distribute-commands (+ (or distribute-using-from + distribute-using + distribute-reference)) + (:lambda (commands) + (cons :distribute commands))) diff --git a/src/parsers/command-keywords.lisp b/src/parsers/command-keywords.lisp index a2454cd..9a4dcea 100644 --- a/src/parsers/command-keywords.lisp +++ b/src/parsers/command-keywords.lisp @@ -103,6 +103,9 @@ (def-keyword-rule "trim") (def-keyword-rule "unquoted") (def-keyword-rule "delimiter") + ;; option for Citus support + (def-keyword-rule "distribute") + (def-keyword-rule "reference") ;; option for MySQL imports (def-keyword-rule "schema") (def-keyword-rule "schemas") diff --git a/src/parsers/command-pgsql.lisp b/src/parsers/command-pgsql.lisp index 6599c4f..f5f7996 100644 --- a/src/parsers/command-pgsql.lisp +++ b/src/parsers/command-pgsql.lisp @@ -80,7 +80,8 @@ decoding-tables-as before-load after-schema - after-load)) + after-load + distribute-commands)) (:lambda (clauses-list) (alexandria:alist-plist clauses-list))) @@ -109,6 +110,7 @@ alter-table alter-schema ((:including incl)) ((:excluding excl)) + distribute &allow-other-keys) `(lambda () (let* ((*default-cast-rules* ',*pgsql-default-cast-rules*) @@ -133,6 +135,7 @@ :set-table-oids t :on-error-stop on-error-stop :after-schema ',after-schema + :distribute ',distribute ,@(remove-batch-control-option options)) ,(sql-code-block pg-dst-db-conn :post after "after load")))) @@ -143,7 +146,7 @@ pg-dst-db-uri &key gucs casts before after after-schema options - alter-table alter-schema + alter-table alter-schema distribute including excluding decoding) source (cond (*dry-run* @@ -158,6 +161,7 @@ :options options :alter-table alter-table :alter-schema alter-schema + :distribute distribute :including including :excluding excluding :decoding decoding)))))) diff --git a/src/pgsql/pgsql-ddl-citus.lisp b/src/pgsql/pgsql-ddl-citus.lisp new file mode 100644 index 0000000..f74ade5 --- /dev/null +++ b/src/pgsql/pgsql-ddl-citus.lisp @@ -0,0 +1,18 @@ +;;; +;;; PostgreSQL Citus support for calling functions. +;;; + +(in-package :pgloader.pgsql) + +(defmethod format-create-sql ((rule citus-reference-table) + &key (stream nil) if-not-exists) + (declare (ignore if-not-exists)) + (format stream "SELECT create_reference_table('~a');" + (format-table-name (citus-reference-table-table rule)))) + +(defmethod format-create-sql ((rule citus-distributed-table) + &key (stream nil) if-not-exists) + (declare (ignore if-not-exists)) + (format stream "SELECT create_distributed_table('~a', '~a');" + (format-table-name (citus-distributed-table-table rule)) + (column-name (citus-distributed-table-using rule)))) diff --git a/src/pgsql/pgsql-schema.lisp b/src/pgsql/pgsql-schema.lisp index 9ea3d59..72da2ac 100644 --- a/src/pgsql/pgsql-schema.lisp +++ b/src/pgsql/pgsql-schema.lisp @@ -198,7 +198,7 @@ (loop :for (schema-name name oid table-schema table-name - primary unique sql conname condef) + primary unique cols sql conname condef) :in (query nil (format nil (sql "/pgsql/list-all-indexes.sql") @@ -222,7 +222,7 @@ :table table :primary primary :unique unique - :columns nil + :columns (split-sequence:split-sequence #\, cols) :sql sql :conname (unless (eq :null conname) (ensure-quoted conname)) diff --git a/src/pgsql/sql/list-all-indexes.sql b/src/pgsql/sql/list-all-indexes.sql index bfffbf7..1f655fa 100644 --- a/src/pgsql/sql/list-all-indexes.sql +++ b/src/pgsql/sql/list-all-indexes.sql @@ -9,6 +9,11 @@ r.relname, indisprimary, indisunique, + (select string_agg(attname, ',') + from pg_attribute + where attrelid = r.oid + and array[attnum::integer] <@ indkey::integer[] + ) as cols, pg_get_indexdef(indexrelid), c.conname, pg_get_constraintdef(c.oid) diff --git a/src/utils/catalog.lisp b/src/utils/catalog.lisp index c61ce8f..46ddbc6 100644 --- a/src/utils/catalog.lisp +++ b/src/utils/catalog.lisp @@ -50,7 +50,8 @@ (defstruct table source-name name schema oid comment storage-parameter-list ;; field is for SOURCE ;; column is for TARGET - field-list column-list index-list fkey-list trigger-list) + ;; citus is an extra slot for citus support + field-list column-list index-list fkey-list trigger-list citus-rule) ;;; ;;; When migrating from PostgreSQL to PostgreSQL we might have to install diff --git a/src/utils/citus.lisp b/src/utils/citus.lisp new file mode 100644 index 0000000..b080afb --- /dev/null +++ b/src/utils/citus.lisp @@ -0,0 +1,89 @@ +;;; +;;; Citus support in pgloader allows to declare what needs to change in the +;;; source schema in terms of Citus concepts: reference and distributed +;;; table. +;;; + +#| + distribute billers using id + distribute bills using biller_id + distribute receivable_accounts using biller_id + distribute payments using biller_id + + distribute splits using biller_id + from receivable_accounts + + distribute ach_accounts as reference table +|# + + +(in-package #:pgloader.catalog) + +(defstruct citus-reference-table table) +(defstruct citus-distributed-table table using from) + +(defun citus-distribute-schema (catalog distribution-rules) + "Distribute a CATALOG with given user provided DISTRIBUTION-RULES." + (loop :for rule :in distribution-rules + :do (let ((table (citus-find-table catalog (citus-rule-table rule)))) + (apply-citus-rule rule table)))) + +(defun citus-rule-table (rule) + (etypecase rule + (citus-reference-table (citus-reference-table-table rule)) + (citus-distributed-table (citus-distributed-table-table rule)))) + +(defun citus-find-table (catalog table) + (let* ((table-name (table-name table)) + (schema-name (schema-name (table-schema table)))) + (find-table (find-schema catalog schema-name) table-name))) + +(defgeneric apply-citus-rule (rule table) + (:documentation "Apply a Citus distribution RULE to given TABLE.")) + +(defmethod apply-citus-rule ((rule citus-reference-table) (table table)) + ;; for a reference table, we have nothing to do really. + (setf (table-citus-rule table) rule)) + +(defmethod apply-citus-rule ((rule citus-distributed-table) (table table)) + (setf (table-citus-rule table) rule) + + ;; ok now we need to check if the USING column exists or if we need to add + ;; it to our model + (let ((column (find (column-name (citus-distributed-table-using rule)) + (table-field-list table) + :test #'string= + :key #'column-name))) + (assert (not (null column))) + + (if column + + ;; add it to the PKEY definition, in first position + (let* ((index (find-if #'index-primary (table-index-list table))) + (idxcol (find (column-name (citus-distributed-table-using rule)) + (index-columns index) + :test #'string=))) + (assert (not (null index))) + (unless idxcol + ;; add a new column + (push (column-name (citus-distributed-table-using rule)) + (index-columns index)) + ;; now remove origin schema sql and condef, we need to redo them + (setf (index-sql index) nil) + (setf (index-condef index) nil))) + + ;; the column doesn't exist, we need to find it in the :FROM rule + (let* ((from-table + (citus-find-table (schema-catalog (table-schema table)) + (citus-distributed-table-from rule))) + (column-definition + (find (column-name (citus-distributed-table-using rule)) + (table-field-list from-table) + :test #'string= + :key #'column-name))) + (assert (not (null from-table))) + (push (make-column :name (column-name column-definition) + :type-name (column-type-name column-definition) + :nullable (column-nullable column-definition) + :transform (column-transform column-definition)) + (table-column-list table))))))