diff --git a/src/load/migrate-database.lisp b/src/load/migrate-database.lisp index d99efbf..0a5452e 100644 --- a/src/load/migrate-database.lisp +++ b/src/load/migrate-database.lisp @@ -19,8 +19,7 @@ set-table-oids materialize-views foreign-keys - include-drop - distribute) + include-drop) "Prepare the target PostgreSQL database: create tables casting datatypes from the MySQL definitions, prepare index definitions and create target tables for materialized views. @@ -126,10 +125,10 @@ ;; placement 2299, since DDL has been executed on a connection that is in ;; use ;; - (when distribute + (when (catalog-distribution-rules catalog) (with-pgsql-transaction (:pgconn (target-db copy)) (with-stats-collection ("Citus Distribute Tables" :section :pre) - (create-distributed-table distribute)))) + (create-distributed-table (catalog-distribution-rules catalog))))) ;; log the catalog we just fetched and (maybe) merged (log-message :data "CATALOG: ~s" catalog)) @@ -249,7 +248,8 @@ ;; we also support schema changes necessary for Citus distribution (when distribute - (pgloader.catalog::citus-distribute-schema catalog distribute))) + (setf (catalog-distribution-rules catalog) + (citus-distribute-schema catalog distribute)))) ;;; @@ -363,8 +363,7 @@ :include-drop include-drop :foreign-keys foreign-keys :set-table-oids set-table-oids - :materialize-views materialize-views - :distribute distribute) + :materialize-views materialize-views) ;; if there's an AFTER SCHEMA DO/EXECUTE command, now is the time ;; to run it. diff --git a/src/package.lisp b/src/package.lisp index e1e74bf..0d3e5b6 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -77,6 +77,7 @@ #:catalog-name #:catalog-schema-list #:catalog-types-without-btree + #:catalog-distribution-rules #:schema-name #:schema-catalog @@ -209,14 +210,14 @@ #:match-rule-action #:match-rule-args - #:citus-reference-table - #:citus-distributed-table - #:make-citus-reference-table - #:make-citus-distributed-table - #:citus-reference-table-table - #:citus-distributed-table-table - #:citus-distributed-table-using - #:citus-distributed-table-from + #:citus-reference-rule + #:citus-distributed-rule + #:make-citus-reference-rule + #:make-citus-distributed-rule + #:citus-reference-rule-rule + #:citus-distributed-rule-table + #:citus-distributed-rule-using + #:citus-distributed-rule-from #:citus-format-sql-select #:citus-backfill-table-p @@ -288,6 +289,16 @@ (:export #:*queries* #:sql)) +(defpackage #:pgloader.citus + (:use #:cl + #:pgloader.params + #:pgloader.catalog + #:pgloader.quoting + #:pgloader.monitor) + (:export #:citus-distribute-schema + #:citus-format-sql-select + #:citus-backfill-table-p)) + (defpackage #:pgloader.utils (:use #:cl #:pgloader.params @@ -295,7 +306,8 @@ #:pgloader.quoting #:pgloader.catalog #:pgloader.monitor - #:pgloader.state) + #:pgloader.state + #:pgloader.citus) (:import-from #:alexandria #:appendf #:read-file-into-string) @@ -326,7 +338,8 @@ (cl-user::export-inherited-symbols "pgloader.quoting" "pgloader.utils") (cl-user::export-inherited-symbols "pgloader.catalog" "pgloader.utils") (cl-user::export-inherited-symbols "pgloader.monitor" "pgloader.utils") - (cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils")) + (cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils") + (cl-user::export-inherited-symbols "pgloader.citus" "pgloader.utils")) ;; diff --git a/src/parsers/command-distribute.lisp b/src/parsers/command-distribute.lisp index 6ae0b66..70b7c37 100644 --- a/src/parsers/command-distribute.lisp +++ b/src/parsers/command-distribute.lisp @@ -25,13 +25,13 @@ (defrule distribute-reference (and kw-distribute dsn-table-name kw-as kw-reference kw-table) (:lambda (d-r) - (make-citus-reference-table :table (create-table-from-dsn-table-name d-r)))) + (make-citus-reference-rule :table (create-table-from-dsn-table-name d-r)))) (defrule distribute-using (and kw-distribute dsn-table-name kw-using maybe-quoted-namestring) (:lambda (d-u) - (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u) - :using (make-column :name (fourth d-u))))) + (make-citus-distributed-rule :table (create-table-from-dsn-table-name d-u) + :using (make-column :name (fourth d-u))))) ;;; ;;; The namestring rule allows for commas and we use them as a separator @@ -62,9 +62,9 @@ kw-using maybe-quoted-namestring kw-from distribute-from-list) (:lambda (d-u-f) - (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u-f) - :using (make-column :name (fourth d-u-f)) - :from (mapcar #'create-table (sixth d-u-f))))) + (make-citus-distributed-rule :table (create-table-from-dsn-table-name d-u-f) + :using (make-column :name (fourth d-u-f)) + :from (mapcar #'create-table (sixth d-u-f))))) (defrule distribute-commands (+ (or distribute-using-from distribute-using diff --git a/src/pgsql/pgsql-ddl-citus.lisp b/src/pgsql/pgsql-ddl-citus.lisp index f74ade5..d5cd050 100644 --- a/src/pgsql/pgsql-ddl-citus.lisp +++ b/src/pgsql/pgsql-ddl-citus.lisp @@ -4,15 +4,15 @@ (in-package :pgloader.pgsql) -(defmethod format-create-sql ((rule citus-reference-table) +(defmethod format-create-sql ((rule citus-reference-rule) &key (stream nil) if-not-exists) (declare (ignore if-not-exists)) (format stream "SELECT create_reference_table('~a');" - (format-table-name (citus-reference-table-table rule)))) + (format-table-name (citus-reference-rule-table rule)))) -(defmethod format-create-sql ((rule citus-distributed-table) +(defmethod format-create-sql ((rule citus-distributed-rule) &key (stream nil) if-not-exists) (declare (ignore if-not-exists)) (format stream "SELECT create_distributed_table('~a', '~a');" - (format-table-name (citus-distributed-table-table rule)) - (column-name (citus-distributed-table-using rule)))) + (format-table-name (citus-distributed-rule-table rule)) + (column-name (citus-distributed-rule-using rule)))) diff --git a/src/utils/catalog.lisp b/src/utils/catalog.lisp index 6b29aad..baca81a 100644 --- a/src/utils/catalog.lisp +++ b/src/utils/catalog.lisp @@ -42,7 +42,7 @@ ;;; Column structures details depend on the specific source type and are ;;; implemented in each source separately. ;;; -(defstruct catalog name schema-list types-without-btree) +(defstruct catalog name schema-list types-without-btree distribution-rules) (defstruct schema source-name name catalog in-search-path table-list view-list extension-list sqltype-list) diff --git a/src/utils/citus.lisp b/src/utils/citus.lisp index 0cdcc9b..c1b73b8 100644 --- a/src/utils/citus.lisp +++ b/src/utils/citus.lisp @@ -17,58 +17,182 @@ |# -(in-package #:pgloader.catalog) +(in-package #:pgloader.citus) -(defstruct citus-reference-table table) -(defstruct citus-distributed-table table using from) +;;; +;;; Main data structures to host our distribution rules. +;;; +(defstruct citus-reference-rule table) +(defstruct citus-distributed-rule table using from) (defun citus-distribute-schema (catalog distribution-rules) - "Distribute a CATALOG with given user provided DISTRIBUTION-RULES." - (loop :for rule :in distribution-rules - :do (let ((table (citus-find-table catalog (citus-rule-table rule)))) - (apply-citus-rule rule table)))) + "Distribute a CATALOG with given user provided DISTRIBUTION-RULES. Return + the list of rules applied." + (let ((processed-rules '()) + (derived-rules + (loop :for rule :in distribution-rules + :append (progn + (citus-set-table rule catalog) + (compute-foreign-rules rule (citus-rule-table rule)))))) -(defun citus-rule-table (rule) - (etypecase rule - (citus-reference-table (citus-reference-table-table rule)) - (citus-distributed-table (citus-distributed-table-table rule)))) + ;; + ;; Apply rules only once. + ;; + ;; ERROR Database error 42P16: table ;; "campaigns" is already distributed + ;; + (loop :for rule :in (append distribution-rules derived-rules) + :unless (member (table-oid (citus-rule-table rule)) + processed-rules + :key (lambda (rule) + (table-oid (citus-rule-table rule)))) + :collect (progn + (push rule processed-rules) + (apply-citus-rule rule) + rule)))) (defun citus-find-table (catalog table) (let* ((table-name (table-name table)) (schema-name (schema-name (table-schema table)))) (find-table (find-schema catalog schema-name) table-name))) -(defgeneric apply-citus-rule (rule table) +(defgeneric citus-rule-table (rule) + (:documentation "Returns the RULE's table.") + (:method ((rule citus-reference-rule)) (citus-reference-rule-table rule)) + (:method ((rule citus-distributed-rule)) (citus-distributed-rule-table rule))) + +(defgeneric citus-set-table (rule catalog) + (:documentation "Find citus RULE table in CATALOG and update the + placeholder with the table found there.") + (:method ((rule citus-reference-rule) (catalog catalog)) + (let ((table (citus-reference-rule-table rule))) + (setf (citus-reference-rule-table rule) + (citus-find-table catalog table)))) + + (:method ((rule citus-distributed-rule) (catalog catalog)) + (let ((table (citus-distributed-rule-table rule))) + (map-into (citus-distributed-rule-from rule) + (lambda (from) (citus-find-table catalog from)) + (citus-distributed-rule-from rule)) + (setf (citus-distributed-rule-table rule) + (citus-find-table catalog table))))) + +(defmethod print-object ((table citus-reference-rule) stream) + (print-unreadable-object (table stream :type t :identity t) + (with-slots (table) table + (format stream "distribute ~a as reference" (format-table-name table))))) + +(defmethod print-object ((table citus-distributed-rule) stream) + (print-unreadable-object (table stream :type t :identity t) + (with-slots (table using from) table + (format stream + "distribute ~a :using ~a~@[ :from ~{~a~^, ~}~]" + (format-table-name table) + (column-name using) + (mapcar #'format-table-name from))))) + + +;;; +;;; When distributing a table on a given key, we can follow foreign keys +;;; pointing to this table. We might find out that when computing the +;;; following rule: +;;; +;;; distribute companies using id +;;; +;;; We then want to add the set of rules that we find walking the foreign +;;; keys: +;;; +;;; distribute campaigns using company_id +;;; distribute ads using company_id from campaigns +;;; distribute clicks using company_id from ads, campaigns +;;; distribute impressions using company_id from ads, campaigns +;;; +(defgeneric compute-foreign-rules (rule table &key) + (:documentation + "Compute rules to apply that derive from the distribution rule RULE when + following foreign-keys from TABLE.")) + +(defmethod compute-foreign-rules ((rule citus-reference-rule) + (table table) + &key) + "There's nothing to do here, reference table doesn't impact the schema." + nil) + +(defmethod compute-foreign-rules ((rule citus-distributed-rule) + (table table) + &key fkey-list) + "Find every foreign key that points to TABLE and add return a list of new + rules for the source of those foreign keys." + (let ((pkey (find-if #'index-primary (table-index-list table)))) + + (when (and pkey (member (column-name (citus-distributed-rule-using rule)) + (index-columns pkey) + :test #'string=)) + (loop :for fkey :in (index-fk-deps pkey) + :for new-fkey-list := (cons fkey fkey-list) + :for new-rule := (make-distributed-table-from-fkey rule new-fkey-list) + :collect new-rule :into new-rule-list + :collect (compute-foreign-rules rule (fkey-table fkey) + :fkey-list new-fkey-list) + :into dep-rule-list + :finally (return (append new-rule-list + ;; flatten sub-lists as we go + (apply #'append dep-rule-list))))))) + +(defun make-distributed-table-from-fkey (rule fkey-list) + "Make a new Citus distributed table rule from an existing rule and a fkey + definition." + ;; + ;; We have a list of foreign keys pointing from a current table, + ;; (fkey-table fkey), to the root table that is distributed, + ;; (fkey-foreign-table fkey). + ;; + ;; For the distribution key name, we consider the name of the column used + ;; in the last entry from the fkey-list, the column name that points to + ;; the root.id distribution key and might be named root_id or something. + ;; + ;; Then we only need to specifying USING the intermediate tables, the last + ;; entry gives us the data we need to backfill our tables. + ;; + (let* ((fkey (car (last fkey-list))) + (dist-key (column-name (citus-distributed-rule-using rule))) + (dist-key-pos (position dist-key + (fkey-foreign-columns fkey) + :test #'string=)) + (fkey-table-dist-key (nth dist-key-pos (fkey-columns fkey))) + (from-table-list (butlast (mapcar #'fkey-foreign-table fkey-list)))) + (make-citus-distributed-rule :table (fkey-table (first fkey-list)) + :using (make-column :name fkey-table-dist-key) + :from from-table-list))) + + +;;; +;;; Apply a citus distribution rule to given table, and store the rule +;;; itself to the table-citus-rule slot so that we later know to generate a +;;; proper SELECT query that includes the backfilling. +;;; +(defgeneric apply-citus-rule (rule) (:documentation "Apply a Citus distribution RULE to given TABLE.")) -(defmethod apply-citus-rule ((rule citus-reference-table) (table table)) +(defmethod apply-citus-rule ((rule citus-reference-rule)) ;; for a reference table, we have nothing to do really. - (setf (table-citus-rule table) rule)) - -(defmethod apply-citus-rule ((rule citus-distributed-table) (table table)) - (setf (table-citus-rule table) rule) - - ;; - ;; Replace the TABLE placeholders in the :FROM slot of the rule with the - ;; tables from the catalogs. - ;; - (when (citus-distributed-table-from rule) - (let ((catalog (schema-catalog (table-schema table)))) - (map-into (citus-distributed-table-from rule) - (lambda (from) (citus-find-table catalog from)) - (citus-distributed-table-from rule)))) + (setf (table-citus-rule (citus-reference-rule-table rule)) rule) + t) +(defmethod apply-citus-rule ((rule citus-distributed-rule)) ;; ok now we need to check if the USING column exists or if we need to add ;; it to our model - (let ((column (find (column-name (citus-distributed-table-using rule)) - (table-field-list table) - :test #'string= - :key #'column-name))) + (setf (table-citus-rule (citus-distributed-rule-table rule)) rule) + + (let* ((table (citus-distributed-rule-table rule)) + (column (find (column-name (citus-distributed-rule-using rule)) + (table-field-list table) + :test #'string= + :key #'column-name))) (if column ;; add it to the PKEY definition, in first position (add-column-to-pkey table - (column-name (citus-distributed-table-using rule))) + (column-name (citus-distributed-rule-using rule))) ;; The column doesn't exist, we need to find it in the :FROM rule's ;; list. The :FROM slot of the rule is a list of tables to @@ -78,9 +202,9 @@ ;; ;; To find the column definition to add to the current TABLE, look ;; it up in the last entry of the FROM rule's list. - (let* ((last-from-rule (car (last (citus-distributed-table-from rule)))) + (let* ((last-from-rule (car (last (citus-distributed-rule-from rule)))) (column-definition - (find (column-name (citus-distributed-table-using rule)) + (find (column-name (citus-distributed-rule-using rule)) (table-field-list last-from-rule) :test #'string= :key #'column-name)) @@ -122,13 +246,18 @@ :do (push column-name (fkey-foreign-columns fkey)) :do (setf (fkey-condef fkey) nil))))) - + +;;; +;;; Format a query for backfilling the data right from pgloader: +;;; +;;; SELECT dist_key, * FROM source JOIN pivot ON ... +;;; (defun format-citus-join-clause (table distribution-rule) "Format a JOIN clause to backfill the distribution key data in tables that are referencing (even indirectly) the main distribution table." (with-output-to-string (s) (loop :for current-table := table :then rel - :for rel :in (citus-distributed-table-from distribution-rule) + :for rel :in (citus-distributed-rule-from distribution-rule) :do (let* ((fkey (find (ensure-unquoted (table-name rel)) (table-fkey-list current-table) @@ -171,7 +300,7 @@ ;; in the rule). ;; (let* ((last-from-rule - (car (last (citus-distributed-table-from + (car (last (citus-distributed-rule-from (table-citus-rule target-table))))) (cols (append (list @@ -193,9 +322,12 @@ (table-source-name source-table) joins))) +;;; +;;; Predicate to see if a table needs backfilling +;;; (defun citus-backfill-table-p (table) "Returns non-nil when given TABLE should be backfilled with the distribution key." (and (table-citus-rule table) - (typep (table-citus-rule table) 'citus-distributed-table) - (not (null (citus-distributed-table-from (table-citus-rule table)))))) + (typep (table-citus-rule table) 'citus-distributed-rule) + (not (null (citus-distributed-rule-from (table-citus-rule table))))))