mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-05 02:46:10 +02:00
Implement automatic discovery of the Citus distribution rules.
With this patch, the following distribution rule distribute companies using id is equivalent to the following distribution rule set, given foreign keys in the source schema: distribute companies using id distribute campaigns using company_id distribute ads using company_id from campaigns distribute clicks using company_id from ads, campaigns distribute impressions using company_id from ads, campaigns In the current code (of this patch) pgloader walks the foreign-keys dependency tree and knows how to automatically derive distribution rules from a single rule and the foreign keys.
This commit is contained in:
parent
8112a9b54f
commit
d3b21ac54d
@ -19,8 +19,7 @@
|
||||
set-table-oids
|
||||
materialize-views
|
||||
foreign-keys
|
||||
include-drop
|
||||
distribute)
|
||||
include-drop)
|
||||
"Prepare the target PostgreSQL database: create tables casting datatypes
|
||||
from the MySQL definitions, prepare index definitions and create target
|
||||
tables for materialized views.
|
||||
@ -126,10 +125,10 @@
|
||||
;; placement 2299, since DDL has been executed on a connection that is in
|
||||
;; use
|
||||
;;
|
||||
(when distribute
|
||||
(when (catalog-distribution-rules catalog)
|
||||
(with-pgsql-transaction (:pgconn (target-db copy))
|
||||
(with-stats-collection ("Citus Distribute Tables" :section :pre)
|
||||
(create-distributed-table distribute))))
|
||||
(create-distributed-table (catalog-distribution-rules catalog)))))
|
||||
|
||||
;; log the catalog we just fetched and (maybe) merged
|
||||
(log-message :data "CATALOG: ~s" catalog))
|
||||
@ -249,7 +248,8 @@
|
||||
|
||||
;; we also support schema changes necessary for Citus distribution
|
||||
(when distribute
|
||||
(pgloader.catalog::citus-distribute-schema catalog distribute)))
|
||||
(setf (catalog-distribution-rules catalog)
|
||||
(citus-distribute-schema catalog distribute))))
|
||||
|
||||
|
||||
;;;
|
||||
@ -363,8 +363,7 @@
|
||||
:include-drop include-drop
|
||||
:foreign-keys foreign-keys
|
||||
:set-table-oids set-table-oids
|
||||
:materialize-views materialize-views
|
||||
:distribute distribute)
|
||||
:materialize-views materialize-views)
|
||||
|
||||
;; if there's an AFTER SCHEMA DO/EXECUTE command, now is the time
|
||||
;; to run it.
|
||||
|
||||
@ -77,6 +77,7 @@
|
||||
#:catalog-name
|
||||
#:catalog-schema-list
|
||||
#:catalog-types-without-btree
|
||||
#:catalog-distribution-rules
|
||||
|
||||
#:schema-name
|
||||
#:schema-catalog
|
||||
@ -209,14 +210,14 @@
|
||||
#:match-rule-action
|
||||
#:match-rule-args
|
||||
|
||||
#:citus-reference-table
|
||||
#:citus-distributed-table
|
||||
#:make-citus-reference-table
|
||||
#:make-citus-distributed-table
|
||||
#:citus-reference-table-table
|
||||
#:citus-distributed-table-table
|
||||
#:citus-distributed-table-using
|
||||
#:citus-distributed-table-from
|
||||
#:citus-reference-rule
|
||||
#:citus-distributed-rule
|
||||
#:make-citus-reference-rule
|
||||
#:make-citus-distributed-rule
|
||||
#:citus-reference-rule-rule
|
||||
#:citus-distributed-rule-table
|
||||
#:citus-distributed-rule-using
|
||||
#:citus-distributed-rule-from
|
||||
#:citus-format-sql-select
|
||||
#:citus-backfill-table-p
|
||||
|
||||
@ -288,6 +289,16 @@
|
||||
(:export #:*queries*
|
||||
#:sql))
|
||||
|
||||
(defpackage #:pgloader.citus
|
||||
(:use #:cl
|
||||
#:pgloader.params
|
||||
#:pgloader.catalog
|
||||
#:pgloader.quoting
|
||||
#:pgloader.monitor)
|
||||
(:export #:citus-distribute-schema
|
||||
#:citus-format-sql-select
|
||||
#:citus-backfill-table-p))
|
||||
|
||||
(defpackage #:pgloader.utils
|
||||
(:use #:cl
|
||||
#:pgloader.params
|
||||
@ -295,7 +306,8 @@
|
||||
#:pgloader.quoting
|
||||
#:pgloader.catalog
|
||||
#:pgloader.monitor
|
||||
#:pgloader.state)
|
||||
#:pgloader.state
|
||||
#:pgloader.citus)
|
||||
(:import-from #:alexandria
|
||||
#:appendf
|
||||
#:read-file-into-string)
|
||||
@ -326,7 +338,8 @@
|
||||
(cl-user::export-inherited-symbols "pgloader.quoting" "pgloader.utils")
|
||||
(cl-user::export-inherited-symbols "pgloader.catalog" "pgloader.utils")
|
||||
(cl-user::export-inherited-symbols "pgloader.monitor" "pgloader.utils")
|
||||
(cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils"))
|
||||
(cl-user::export-inherited-symbols "pgloader.state" "pgloader.utils")
|
||||
(cl-user::export-inherited-symbols "pgloader.citus" "pgloader.utils"))
|
||||
|
||||
|
||||
;;
|
||||
|
||||
@ -25,13 +25,13 @@
|
||||
(defrule distribute-reference (and kw-distribute dsn-table-name
|
||||
kw-as kw-reference kw-table)
|
||||
(:lambda (d-r)
|
||||
(make-citus-reference-table :table (create-table-from-dsn-table-name d-r))))
|
||||
(make-citus-reference-rule :table (create-table-from-dsn-table-name d-r))))
|
||||
|
||||
(defrule distribute-using (and kw-distribute dsn-table-name
|
||||
kw-using maybe-quoted-namestring)
|
||||
(:lambda (d-u)
|
||||
(make-citus-distributed-table :table (create-table-from-dsn-table-name d-u)
|
||||
:using (make-column :name (fourth d-u)))))
|
||||
(make-citus-distributed-rule :table (create-table-from-dsn-table-name d-u)
|
||||
:using (make-column :name (fourth d-u)))))
|
||||
|
||||
;;;
|
||||
;;; The namestring rule allows for commas and we use them as a separator
|
||||
@ -62,9 +62,9 @@
|
||||
kw-using maybe-quoted-namestring
|
||||
kw-from distribute-from-list)
|
||||
(:lambda (d-u-f)
|
||||
(make-citus-distributed-table :table (create-table-from-dsn-table-name d-u-f)
|
||||
:using (make-column :name (fourth d-u-f))
|
||||
:from (mapcar #'create-table (sixth d-u-f)))))
|
||||
(make-citus-distributed-rule :table (create-table-from-dsn-table-name d-u-f)
|
||||
:using (make-column :name (fourth d-u-f))
|
||||
:from (mapcar #'create-table (sixth d-u-f)))))
|
||||
|
||||
(defrule distribute-commands (+ (or distribute-using-from
|
||||
distribute-using
|
||||
|
||||
@ -4,15 +4,15 @@
|
||||
|
||||
(in-package :pgloader.pgsql)
|
||||
|
||||
(defmethod format-create-sql ((rule citus-reference-table)
|
||||
(defmethod format-create-sql ((rule citus-reference-rule)
|
||||
&key (stream nil) if-not-exists)
|
||||
(declare (ignore if-not-exists))
|
||||
(format stream "SELECT create_reference_table('~a');"
|
||||
(format-table-name (citus-reference-table-table rule))))
|
||||
(format-table-name (citus-reference-rule-table rule))))
|
||||
|
||||
(defmethod format-create-sql ((rule citus-distributed-table)
|
||||
(defmethod format-create-sql ((rule citus-distributed-rule)
|
||||
&key (stream nil) if-not-exists)
|
||||
(declare (ignore if-not-exists))
|
||||
(format stream "SELECT create_distributed_table('~a', '~a');"
|
||||
(format-table-name (citus-distributed-table-table rule))
|
||||
(column-name (citus-distributed-table-using rule))))
|
||||
(format-table-name (citus-distributed-rule-table rule))
|
||||
(column-name (citus-distributed-rule-using rule))))
|
||||
|
||||
@ -42,7 +42,7 @@
|
||||
;;; Column structures details depend on the specific source type and are
|
||||
;;; implemented in each source separately.
|
||||
;;;
|
||||
(defstruct catalog name schema-list types-without-btree)
|
||||
(defstruct catalog name schema-list types-without-btree distribution-rules)
|
||||
|
||||
(defstruct schema source-name name catalog in-search-path
|
||||
table-list view-list extension-list sqltype-list)
|
||||
|
||||
@ -17,58 +17,182 @@
|
||||
|#
|
||||
|
||||
|
||||
(in-package #:pgloader.catalog)
|
||||
(in-package #:pgloader.citus)
|
||||
|
||||
(defstruct citus-reference-table table)
|
||||
(defstruct citus-distributed-table table using from)
|
||||
;;;
|
||||
;;; Main data structures to host our distribution rules.
|
||||
;;;
|
||||
(defstruct citus-reference-rule table)
|
||||
(defstruct citus-distributed-rule table using from)
|
||||
|
||||
(defun citus-distribute-schema (catalog distribution-rules)
|
||||
"Distribute a CATALOG with given user provided DISTRIBUTION-RULES."
|
||||
(loop :for rule :in distribution-rules
|
||||
:do (let ((table (citus-find-table catalog (citus-rule-table rule))))
|
||||
(apply-citus-rule rule table))))
|
||||
"Distribute a CATALOG with given user provided DISTRIBUTION-RULES. Return
|
||||
the list of rules applied."
|
||||
(let ((processed-rules '())
|
||||
(derived-rules
|
||||
(loop :for rule :in distribution-rules
|
||||
:append (progn
|
||||
(citus-set-table rule catalog)
|
||||
(compute-foreign-rules rule (citus-rule-table rule))))))
|
||||
|
||||
(defun citus-rule-table (rule)
|
||||
(etypecase rule
|
||||
(citus-reference-table (citus-reference-table-table rule))
|
||||
(citus-distributed-table (citus-distributed-table-table rule))))
|
||||
;;
|
||||
;; Apply rules only once.
|
||||
;;
|
||||
;; ERROR Database error 42P16: table ;; "campaigns" is already distributed
|
||||
;;
|
||||
(loop :for rule :in (append distribution-rules derived-rules)
|
||||
:unless (member (table-oid (citus-rule-table rule))
|
||||
processed-rules
|
||||
:key (lambda (rule)
|
||||
(table-oid (citus-rule-table rule))))
|
||||
:collect (progn
|
||||
(push rule processed-rules)
|
||||
(apply-citus-rule rule)
|
||||
rule))))
|
||||
|
||||
(defun citus-find-table (catalog table)
|
||||
(let* ((table-name (table-name table))
|
||||
(schema-name (schema-name (table-schema table))))
|
||||
(find-table (find-schema catalog schema-name) table-name)))
|
||||
|
||||
(defgeneric apply-citus-rule (rule table)
|
||||
(defgeneric citus-rule-table (rule)
|
||||
(:documentation "Returns the RULE's table.")
|
||||
(:method ((rule citus-reference-rule)) (citus-reference-rule-table rule))
|
||||
(:method ((rule citus-distributed-rule)) (citus-distributed-rule-table rule)))
|
||||
|
||||
(defgeneric citus-set-table (rule catalog)
|
||||
(:documentation "Find citus RULE table in CATALOG and update the
|
||||
placeholder with the table found there.")
|
||||
(:method ((rule citus-reference-rule) (catalog catalog))
|
||||
(let ((table (citus-reference-rule-table rule)))
|
||||
(setf (citus-reference-rule-table rule)
|
||||
(citus-find-table catalog table))))
|
||||
|
||||
(:method ((rule citus-distributed-rule) (catalog catalog))
|
||||
(let ((table (citus-distributed-rule-table rule)))
|
||||
(map-into (citus-distributed-rule-from rule)
|
||||
(lambda (from) (citus-find-table catalog from))
|
||||
(citus-distributed-rule-from rule))
|
||||
(setf (citus-distributed-rule-table rule)
|
||||
(citus-find-table catalog table)))))
|
||||
|
||||
(defmethod print-object ((table citus-reference-rule) stream)
|
||||
(print-unreadable-object (table stream :type t :identity t)
|
||||
(with-slots (table) table
|
||||
(format stream "distribute ~a as reference" (format-table-name table)))))
|
||||
|
||||
(defmethod print-object ((table citus-distributed-rule) stream)
|
||||
(print-unreadable-object (table stream :type t :identity t)
|
||||
(with-slots (table using from) table
|
||||
(format stream
|
||||
"distribute ~a :using ~a~@[ :from ~{~a~^, ~}~]"
|
||||
(format-table-name table)
|
||||
(column-name using)
|
||||
(mapcar #'format-table-name from)))))
|
||||
|
||||
|
||||
;;;
|
||||
;;; When distributing a table on a given key, we can follow foreign keys
|
||||
;;; pointing to this table. We might find out that when computing the
|
||||
;;; following rule:
|
||||
;;;
|
||||
;;; distribute companies using id
|
||||
;;;
|
||||
;;; We then want to add the set of rules that we find walking the foreign
|
||||
;;; keys:
|
||||
;;;
|
||||
;;; distribute campaigns using company_id
|
||||
;;; distribute ads using company_id from campaigns
|
||||
;;; distribute clicks using company_id from ads, campaigns
|
||||
;;; distribute impressions using company_id from ads, campaigns
|
||||
;;;
|
||||
(defgeneric compute-foreign-rules (rule table &key)
|
||||
(:documentation
|
||||
"Compute rules to apply that derive from the distribution rule RULE when
|
||||
following foreign-keys from TABLE."))
|
||||
|
||||
(defmethod compute-foreign-rules ((rule citus-reference-rule)
|
||||
(table table)
|
||||
&key)
|
||||
"There's nothing to do here, reference table doesn't impact the schema."
|
||||
nil)
|
||||
|
||||
(defmethod compute-foreign-rules ((rule citus-distributed-rule)
|
||||
(table table)
|
||||
&key fkey-list)
|
||||
"Find every foreign key that points to TABLE and add return a list of new
|
||||
rules for the source of those foreign keys."
|
||||
(let ((pkey (find-if #'index-primary (table-index-list table))))
|
||||
|
||||
(when (and pkey (member (column-name (citus-distributed-rule-using rule))
|
||||
(index-columns pkey)
|
||||
:test #'string=))
|
||||
(loop :for fkey :in (index-fk-deps pkey)
|
||||
:for new-fkey-list := (cons fkey fkey-list)
|
||||
:for new-rule := (make-distributed-table-from-fkey rule new-fkey-list)
|
||||
:collect new-rule :into new-rule-list
|
||||
:collect (compute-foreign-rules rule (fkey-table fkey)
|
||||
:fkey-list new-fkey-list)
|
||||
:into dep-rule-list
|
||||
:finally (return (append new-rule-list
|
||||
;; flatten sub-lists as we go
|
||||
(apply #'append dep-rule-list)))))))
|
||||
|
||||
(defun make-distributed-table-from-fkey (rule fkey-list)
|
||||
"Make a new Citus distributed table rule from an existing rule and a fkey
|
||||
definition."
|
||||
;;
|
||||
;; We have a list of foreign keys pointing from a current table,
|
||||
;; (fkey-table fkey), to the root table that is distributed,
|
||||
;; (fkey-foreign-table fkey).
|
||||
;;
|
||||
;; For the distribution key name, we consider the name of the column used
|
||||
;; in the last entry from the fkey-list, the column name that points to
|
||||
;; the root.id distribution key and might be named root_id or something.
|
||||
;;
|
||||
;; Then we only need to specifying USING the intermediate tables, the last
|
||||
;; entry gives us the data we need to backfill our tables.
|
||||
;;
|
||||
(let* ((fkey (car (last fkey-list)))
|
||||
(dist-key (column-name (citus-distributed-rule-using rule)))
|
||||
(dist-key-pos (position dist-key
|
||||
(fkey-foreign-columns fkey)
|
||||
:test #'string=))
|
||||
(fkey-table-dist-key (nth dist-key-pos (fkey-columns fkey)))
|
||||
(from-table-list (butlast (mapcar #'fkey-foreign-table fkey-list))))
|
||||
(make-citus-distributed-rule :table (fkey-table (first fkey-list))
|
||||
:using (make-column :name fkey-table-dist-key)
|
||||
:from from-table-list)))
|
||||
|
||||
|
||||
;;;
|
||||
;;; Apply a citus distribution rule to given table, and store the rule
|
||||
;;; itself to the table-citus-rule slot so that we later know to generate a
|
||||
;;; proper SELECT query that includes the backfilling.
|
||||
;;;
|
||||
(defgeneric apply-citus-rule (rule)
|
||||
(:documentation "Apply a Citus distribution RULE to given TABLE."))
|
||||
|
||||
(defmethod apply-citus-rule ((rule citus-reference-table) (table table))
|
||||
(defmethod apply-citus-rule ((rule citus-reference-rule))
|
||||
;; for a reference table, we have nothing to do really.
|
||||
(setf (table-citus-rule table) rule))
|
||||
|
||||
(defmethod apply-citus-rule ((rule citus-distributed-table) (table table))
|
||||
(setf (table-citus-rule table) rule)
|
||||
|
||||
;;
|
||||
;; Replace the TABLE placeholders in the :FROM slot of the rule with the
|
||||
;; tables from the catalogs.
|
||||
;;
|
||||
(when (citus-distributed-table-from rule)
|
||||
(let ((catalog (schema-catalog (table-schema table))))
|
||||
(map-into (citus-distributed-table-from rule)
|
||||
(lambda (from) (citus-find-table catalog from))
|
||||
(citus-distributed-table-from rule))))
|
||||
(setf (table-citus-rule (citus-reference-rule-table rule)) rule)
|
||||
t)
|
||||
|
||||
(defmethod apply-citus-rule ((rule citus-distributed-rule))
|
||||
;; ok now we need to check if the USING column exists or if we need to add
|
||||
;; it to our model
|
||||
(let ((column (find (column-name (citus-distributed-table-using rule))
|
||||
(table-field-list table)
|
||||
:test #'string=
|
||||
:key #'column-name)))
|
||||
(setf (table-citus-rule (citus-distributed-rule-table rule)) rule)
|
||||
|
||||
(let* ((table (citus-distributed-rule-table rule))
|
||||
(column (find (column-name (citus-distributed-rule-using rule))
|
||||
(table-field-list table)
|
||||
:test #'string=
|
||||
:key #'column-name)))
|
||||
(if column
|
||||
|
||||
;; add it to the PKEY definition, in first position
|
||||
(add-column-to-pkey table
|
||||
(column-name (citus-distributed-table-using rule)))
|
||||
(column-name (citus-distributed-rule-using rule)))
|
||||
|
||||
;; The column doesn't exist, we need to find it in the :FROM rule's
|
||||
;; list. The :FROM slot of the rule is a list of tables to
|
||||
@ -78,9 +202,9 @@
|
||||
;;
|
||||
;; To find the column definition to add to the current TABLE, look
|
||||
;; it up in the last entry of the FROM rule's list.
|
||||
(let* ((last-from-rule (car (last (citus-distributed-table-from rule))))
|
||||
(let* ((last-from-rule (car (last (citus-distributed-rule-from rule))))
|
||||
(column-definition
|
||||
(find (column-name (citus-distributed-table-using rule))
|
||||
(find (column-name (citus-distributed-rule-using rule))
|
||||
(table-field-list last-from-rule)
|
||||
:test #'string=
|
||||
:key #'column-name))
|
||||
@ -122,13 +246,18 @@
|
||||
:do (push column-name (fkey-foreign-columns fkey))
|
||||
:do (setf (fkey-condef fkey) nil)))))
|
||||
|
||||
|
||||
|
||||
;;;
|
||||
;;; Format a query for backfilling the data right from pgloader:
|
||||
;;;
|
||||
;;; SELECT dist_key, * FROM source JOIN pivot ON ...
|
||||
;;;
|
||||
(defun format-citus-join-clause (table distribution-rule)
|
||||
"Format a JOIN clause to backfill the distribution key data in tables that
|
||||
are referencing (even indirectly) the main distribution table."
|
||||
(with-output-to-string (s)
|
||||
(loop :for current-table := table :then rel
|
||||
:for rel :in (citus-distributed-table-from distribution-rule)
|
||||
:for rel :in (citus-distributed-rule-from distribution-rule)
|
||||
:do (let* ((fkey
|
||||
(find (ensure-unquoted (table-name rel))
|
||||
(table-fkey-list current-table)
|
||||
@ -171,7 +300,7 @@
|
||||
;; in the rule).
|
||||
;;
|
||||
(let* ((last-from-rule
|
||||
(car (last (citus-distributed-table-from
|
||||
(car (last (citus-distributed-rule-from
|
||||
(table-citus-rule target-table)))))
|
||||
(cols
|
||||
(append (list
|
||||
@ -193,9 +322,12 @@
|
||||
(table-source-name source-table)
|
||||
joins)))
|
||||
|
||||
;;;
|
||||
;;; Predicate to see if a table needs backfilling
|
||||
;;;
|
||||
(defun citus-backfill-table-p (table)
|
||||
"Returns non-nil when given TABLE should be backfilled with the
|
||||
distribution key."
|
||||
(and (table-citus-rule table)
|
||||
(typep (table-citus-rule table) 'citus-distributed-table)
|
||||
(not (null (citus-distributed-table-from (table-citus-rule table))))))
|
||||
(typep (table-citus-rule table) 'citus-distributed-rule)
|
||||
(not (null (citus-distributed-rule-from (table-citus-rule table))))))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user