Improve Citus Distribution Support.

With this patch it's now actually possible to backfill the data on the fly
when using the "distribute" new commands. The schema is modified to add the
distribution key where specified, and changes to the primary and foreign
keys happen automatically. Then a JOIN is generated to get the data directly
during the COPY streaming to the Citus cluster.
This commit is contained in:
Dimitri Fontaine 2018-10-16 18:53:41 +02:00
parent 760763be4b
commit 8112a9b54f
9 changed files with 234 additions and 42 deletions

View File

@ -115,15 +115,21 @@
:use-result-as-rows t)
(create-views catalog
:include-drop include-drop
:client-min-messages :error)))
:client-min-messages :error))))
;; Citus Support
(when distribute
;; Citus Support
;;
;; We need a separate transaction here in some cases, because of the
;; distributed DDL support from Citus, to avoid the following error:
;;
;; ERROR Database error 25001: cannot establish a new connection for
;; placement 2299, since DDL has been executed on a connection that is in
;; use
;;
(when distribute
(with-pgsql-transaction (:pgconn (target-db copy))
(with-stats-collection ("Citus Distribute Tables" :section :pre)
(let ((citus-sql
(loop :for rule :in distribute
:collect (format-create-sql rule))))
(pgsql-execute citus-sql :client-min-messages :notice)))))
(create-distributed-table distribute))))
;; log the catalog we just fetched and (maybe) merged
(log-message :data "CATALOG: ~s" catalog))

View File

@ -217,6 +217,8 @@
#:citus-distributed-table-table
#:citus-distributed-table-using
#:citus-distributed-table-from
#:citus-format-sql-select
#:citus-backfill-table-p
#:format-table-name))
@ -433,6 +435,8 @@
#:reset-sequences
#:comment-on-tables-and-columns
#:create-distributed-table
;; finalizing catalogs support (redshift and other variants)
#:finalize-catalogs
#:adjust-data-types

View File

@ -33,13 +33,38 @@
(make-citus-distributed-table :table (create-table-from-dsn-table-name d-u)
:using (make-column :name (fourth d-u)))))
;;;
;;; The namestring rule allows for commas and we use them as a separator
;;; here, so we need to have our own table name parsing. That's a bummer,
;;; maybe we should revisit the whole table names parsing code?
;;;
(defrule distribute-from-tablename
(or double-quoted-namestring
quoted-namestring
(and (or #\_ (alpha-char-p character))
(* (or (alpha-char-p character)
(digit-char-p character)))))
(:text t))
(defrule maybe-qualified-dist-from-table-name
(and distribute-from-tablename (? (and "." distribute-from-tablename)))
(:lambda (name)
(if (second name)
(cons (first name) (second (second name)))
(cons "public" (first name)))))
(defrule distribute-from-list (+ (and maybe-qualified-dist-from-table-name
(? (and "," ignore-whitespace))))
(:lambda (from-list)
(mapcar #'first from-list)))
(defrule distribute-using-from (and kw-distribute dsn-table-name
kw-using maybe-quoted-namestring
kw-from (+ maybe-quoted-namestring))
kw-from distribute-from-list)
(:lambda (d-u-f)
(make-citus-distributed-table :table (create-table-from-dsn-table-name d-u-f)
:using (make-column :name (fourth d-u-f))
:from (apply #'create-table (sixth d-u-f)))))
:from (mapcar #'create-table (sixth d-u-f)))))
(defrule distribute-commands (+ (or distribute-using-from
distribute-using

View File

@ -465,3 +465,14 @@ $$; " tables)))
(column-name column)
quote (column-comment column) quote)))))
(pgsql-execute-with-timing section label sql-list)))
;;;
;;; Citus Disitribution support
;;;
(defun create-distributed-table (distribute-rules)
(let ((citus-sql
(loop :for rule :in distribute-rules
:collect (format-create-sql rule))))
(pgsql-execute citus-sql)))

View File

@ -235,7 +235,7 @@
"Get the list of PostgreSQL index definitions per table."
(loop
:for (schema-name table-name fschema-name ftable-name
conoid conname condef
conoid pkeyoid conname condef
cols fcols
updrule delrule mrule deferrable deferred)
:in (query nil
@ -277,9 +277,13 @@
(table (find-table schema table-name))
(fschema (find-schema catalog fschema-name))
(ftable (find-table fschema ftable-name))
(pkey (find pkeyoid (table-index-list ftable)
:test #'=
:key #'index-oid))
(fk
(make-fkey :name (ensure-quoted conname)
:oid conoid
:pkey pkey
:condef condef
:table table
:columns (split-sequence:split-sequence #\, cols)
@ -290,6 +294,13 @@
:match-rule (pg-fk-match-rule-to-match-clause mrule)
:deferrable deferrable
:initially-deferred deferred)))
;; add the fkey reference to the pkey index too
(unless (find conoid
(index-fk-deps pkey)
:test #'=
:key #'fkey-oid)
(push-to-end fk (index-fk-deps pkey)))
;; check that both tables are in pgloader's scope
(if (and table ftable)
(add-fkey table fk)
(log-message :notice "Foreign Key ~a is ignored, one of its table is missing from pgloader table selection"

View File

@ -7,7 +7,9 @@
-- excluding (ftable)
-- filter-list-to-where-clause for excluding
select n.nspname, c.relname, nf.nspname, cf.relname as frelname,
r.oid, conname,
r.oid,
d.refobjid as pkeyoid,
conname,
pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
(select string_agg(attname, ',')
from pg_attribute
@ -26,6 +28,9 @@
JOIN pg_namespace n on c.relnamespace = n.oid
JOIN pg_class cf on r.confrelid = cf.oid
JOIN pg_namespace nf on cf.relnamespace = nf.oid
JOIN pg_depend d on d.classid = 'pg_constraint'::regclass
and d.objid = r.oid
and d.refobjsubid = 0
where r.contype = 'f'
AND c.relkind in ('r', 'f', 'p')
AND cf.relkind in ('r', 'f', 'p')

View File

@ -41,12 +41,30 @@
(funcall process-row-fn row)))))))
(with-pgsql-connection ((source-db pgsql))
(let* ((cols (mapcar #'column-name (fields pgsql)))
(sql
(format nil "SELECT ~{~s::text~^, ~} FROM ~s.~s" cols
(schema-source-name (table-schema (source pgsql)))
(table-source-name (source pgsql)))))
(cl-postgres:exec-query pomo:*database* sql map-reader)))))
(if (citus-backfill-table-p (target pgsql))
;;
;; SELECT dist_key, * FROM source JOIN dist ON ...
;;
(let ((sql (citus-format-sql-select (source pgsql) (target pgsql))))
(log-message :sql "~a" sql)
(cl-postgres:exec-query pomo:*database* sql map-reader))
;;
;; No JOIN to add to backfill data in the SQL query here.
;;
(let* ((cols (mapcar #'column-name (fields pgsql)))
(sql
(format nil
"SELECT ~{~s::text~^, ~} FROM ~s.~s"
cols
(schema-source-name (table-schema (source pgsql)))
(table-source-name (source pgsql)))))
(log-message :sql "~a" sql)
(cl-postgres:exec-query pomo:*database* sql map-reader))))))
(defmethod copy-column-list ((pgsql copy-pgsql))
"We are sending the data in the MySQL columns ordering here."
(mapcar #'column-name (fields pgsql)))
(defmethod fetch-metadata ((pgsql copy-pgsql)
(catalog catalog)

View File

@ -78,7 +78,7 @@
;;; Index and Foreign Keys
;;;
(defstruct fkey
name oid table columns foreign-table foreign-columns condef
name oid table columns pkey foreign-table foreign-columns condef
update-rule delete-rule match-rule deferrable initially-deferred)
;;;

View File

@ -48,42 +48,154 @@
(defmethod apply-citus-rule ((rule citus-distributed-table) (table table))
(setf (table-citus-rule table) rule)
;;
;; Replace the TABLE placeholders in the :FROM slot of the rule with the
;; tables from the catalogs.
;;
(when (citus-distributed-table-from rule)
(let ((catalog (schema-catalog (table-schema table))))
(map-into (citus-distributed-table-from rule)
(lambda (from) (citus-find-table catalog from))
(citus-distributed-table-from rule))))
;; ok now we need to check if the USING column exists or if we need to add
;; it to our model
(let ((column (find (column-name (citus-distributed-table-using rule))
(table-field-list table)
:test #'string=
:key #'column-name)))
(assert (not (null column)))
(if column
;; add it to the PKEY definition, in first position
(let* ((index (find-if #'index-primary (table-index-list table)))
(idxcol (find (column-name (citus-distributed-table-using rule))
(index-columns index)
:test #'string=)))
(assert (not (null index)))
(unless idxcol
;; add a new column
(push (column-name (citus-distributed-table-using rule))
(index-columns index))
;; now remove origin schema sql and condef, we need to redo them
(setf (index-sql index) nil)
(setf (index-condef index) nil)))
(add-column-to-pkey table
(column-name (citus-distributed-table-using rule)))
;; the column doesn't exist, we need to find it in the :FROM rule
(let* ((from-table
(citus-find-table (schema-catalog (table-schema table))
(citus-distributed-table-from rule)))
;; The column doesn't exist, we need to find it in the :FROM rule's
;; list. The :FROM slot of the rule is a list of tables to
;; "traverse" when backfilling the data. The list follows the
;; foreign-key relationships from TABLE to the source of the
;; distribution key.
;;
;; To find the column definition to add to the current TABLE, look
;; it up in the last entry of the FROM rule's list.
(let* ((last-from-rule (car (last (citus-distributed-table-from rule))))
(column-definition
(find (column-name (citus-distributed-table-using rule))
(table-field-list from-table)
(table-field-list last-from-rule)
:test #'string=
:key #'column-name)))
(assert (not (null from-table)))
(push (make-column :name (column-name column-definition)
:key #'column-name))
(new-column
(make-column :name (column-name column-definition)
:type-name (column-type-name column-definition)
:nullable (column-nullable column-definition)
:transform (column-transform column-definition))
(table-column-list table))))))
:transform (column-transform column-definition))))
;;
;; Here also we need to add the new column to the PKEY definition,
;; in first position.
;;
(add-column-to-pkey table (column-name new-column))
;;
;; We need to backfill the distribution key in the data, which
;; we're implementing with a JOIN when we SELECT from the source
;; table. We add the new field here.
;;
(push new-column (table-field-list table))
(push new-column (table-column-list table))))))
(defun add-column-to-pkey (table column-name)
"Add COLUMN in the first position of the TABLE's primary key index."
(let* ((index (find-if #'index-primary (table-index-list table)))
(idxcol (find column-name (index-columns index) :test #'string=)))
(assert (not (null index)))
(unless idxcol
;; add a new column
(push column-name (index-columns index))
;; now remove origin schema sql and condef, we need to redo them
(setf (index-sql index) nil)
(setf (index-condef index) nil)
;; now tweak the fkey definitions that are using this index
(loop :for fkey :in (index-fk-deps index)
:do (push column-name (fkey-columns fkey))
:do (push column-name (fkey-foreign-columns fkey))
:do (setf (fkey-condef fkey) nil)))))
(defun format-citus-join-clause (table distribution-rule)
"Format a JOIN clause to backfill the distribution key data in tables that
are referencing (even indirectly) the main distribution table."
(with-output-to-string (s)
(loop :for current-table := table :then rel
:for rel :in (citus-distributed-table-from distribution-rule)
:do (let* ((fkey
(find (ensure-unquoted (table-name rel))
(table-fkey-list current-table)
:test #'string=
:key (lambda (fkey)
(ensure-unquoted
(table-name (fkey-foreign-table fkey))))))
(ftable (fkey-foreign-table fkey)))
(format s
" JOIN ~s.~s"
(schema-source-name (table-schema ftable))
(table-source-name ftable))
;;
;; Skip the first column in the fkey definition, that's the
;; distribution key that was just added by pgloader: we don't
;; have it on the source database, we are going to create it on
;; the target database.
;;
(loop :for first := t :then nil
:for c :in (cdr (fkey-columns fkey))
:for fc :in (cdr (fkey-foreign-columns fkey))
:do (format s
" ~:[AND~;ON~] ~a.~a = ~a.~a"
first
(table-source-name (fkey-table fkey))
c
(table-source-name (fkey-foreign-table fkey))
fc))))))
(defun citus-format-sql-select (source-table target-table)
"Return the SQL statement to use to fetch data from the COPY context,
including backfilling the distribution key in related tables."
;;
;; SELECT from.id, id, ... from source join from-table ...
;;
;; So we must be careful to prefix the column names with the
;; proper table name, because of the join(s), and the first column
;; in the output is taken from the main FROM table (the last one
;; in the rule).
;;
(let* ((last-from-rule
(car (last (citus-distributed-table-from
(table-citus-rule target-table)))))
(cols
(append (list
(format nil "~a.~a"
(table-name last-from-rule)
(column-name (first (table-field-list source-table)))))
(mapcar (lambda (field)
(format nil "~a.~a"
(table-name source-table)
(column-name field)))
(rest (table-field-list source-table)))))
(joins
(format-citus-join-clause source-table
(table-citus-rule target-table))))
(format nil
"SELECT ~{~a::text~^, ~} FROM ~s.~s ~a"
cols
(schema-source-name (table-schema source-table))
(table-source-name source-table)
joins)))
(defun citus-backfill-table-p (table)
"Returns non-nil when given TABLE should be backfilled with the
distribution key."
(and (table-citus-rule table)
(typep (table-citus-rule table) 'citus-distributed-table)
(not (null (citus-distributed-table-from (table-citus-rule table))))))