From 8112a9b54fc8124ec849324803ebfdb67c1eda2d Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Tue, 16 Oct 2018 18:53:41 +0200 Subject: [PATCH] Improve Citus Distribution Support. With this patch it's now actually possible to backfill the data on the fly when using the "distribute" new commands. The schema is modified to add the distribution key where specified, and changes to the primary and foreign keys happen automatically. Then a JOIN is generated to get the data directly during the COPY streaming to the Citus cluster. --- src/load/migrate-database.lisp | 20 ++-- src/package.lisp | 4 + src/parsers/command-distribute.lisp | 29 ++++- src/pgsql/pgsql-create-schema.lisp | 11 ++ src/pgsql/pgsql-schema.lisp | 13 ++- src/pgsql/sql/list-all-fkeys.sql | 7 +- src/sources/pgsql/pgsql.lisp | 30 ++++-- src/utils/catalog.lisp | 2 +- src/utils/citus.lisp | 160 +++++++++++++++++++++++----- 9 files changed, 234 insertions(+), 42 deletions(-) diff --git a/src/load/migrate-database.lisp b/src/load/migrate-database.lisp index 129ca5b..d99efbf 100644 --- a/src/load/migrate-database.lisp +++ b/src/load/migrate-database.lisp @@ -115,15 +115,21 @@ :use-result-as-rows t) (create-views catalog :include-drop include-drop - :client-min-messages :error))) + :client-min-messages :error)))) - ;; Citus Support - (when distribute + ;; Citus Support + ;; + ;; We need a separate transaction here in some cases, because of the + ;; distributed DDL support from Citus, to avoid the following error: + ;; + ;; ERROR Database error 25001: cannot establish a new connection for + ;; placement 2299, since DDL has been executed on a connection that is in + ;; use + ;; + (when distribute + (with-pgsql-transaction (:pgconn (target-db copy)) (with-stats-collection ("Citus Distribute Tables" :section :pre) - (let ((citus-sql - (loop :for rule :in distribute - :collect (format-create-sql rule)))) - (pgsql-execute citus-sql :client-min-messages :notice))))) + (create-distributed-table distribute)))) ;; log the catalog we just fetched and (maybe) merged (log-message :data "CATALOG: ~s" catalog)) diff --git a/src/package.lisp b/src/package.lisp index 8d32d64..e1e74bf 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -217,6 +217,8 @@ #:citus-distributed-table-table #:citus-distributed-table-using #:citus-distributed-table-from + #:citus-format-sql-select + #:citus-backfill-table-p #:format-table-name)) @@ -433,6 +435,8 @@ #:reset-sequences #:comment-on-tables-and-columns + #:create-distributed-table + ;; finalizing catalogs support (redshift and other variants) #:finalize-catalogs #:adjust-data-types diff --git a/src/parsers/command-distribute.lisp b/src/parsers/command-distribute.lisp index 0a642b7..6ae0b66 100644 --- a/src/parsers/command-distribute.lisp +++ b/src/parsers/command-distribute.lisp @@ -33,13 +33,38 @@ (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u) :using (make-column :name (fourth d-u))))) +;;; +;;; The namestring rule allows for commas and we use them as a separator +;;; here, so we need to have our own table name parsing. That's a bummer, +;;; maybe we should revisit the whole table names parsing code? +;;; +(defrule distribute-from-tablename + (or double-quoted-namestring + quoted-namestring + (and (or #\_ (alpha-char-p character)) + (* (or (alpha-char-p character) + (digit-char-p character))))) + (:text t)) + +(defrule maybe-qualified-dist-from-table-name + (and distribute-from-tablename (? (and "." distribute-from-tablename))) + (:lambda (name) + (if (second name) + (cons (first name) (second (second name))) + (cons "public" (first name))))) + +(defrule distribute-from-list (+ (and maybe-qualified-dist-from-table-name + (? (and "," ignore-whitespace)))) + (:lambda (from-list) + (mapcar #'first from-list))) + (defrule distribute-using-from (and kw-distribute dsn-table-name kw-using maybe-quoted-namestring - kw-from (+ maybe-quoted-namestring)) + kw-from distribute-from-list) (:lambda (d-u-f) (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u-f) :using (make-column :name (fourth d-u-f)) - :from (apply #'create-table (sixth d-u-f))))) + :from (mapcar #'create-table (sixth d-u-f))))) (defrule distribute-commands (+ (or distribute-using-from distribute-using diff --git a/src/pgsql/pgsql-create-schema.lisp b/src/pgsql/pgsql-create-schema.lisp index e6154e5..b06c31d 100644 --- a/src/pgsql/pgsql-create-schema.lisp +++ b/src/pgsql/pgsql-create-schema.lisp @@ -465,3 +465,14 @@ $$; " tables))) (column-name column) quote (column-comment column) quote))))) (pgsql-execute-with-timing section label sql-list))) + + + +;;; +;;; Citus Disitribution support +;;; +(defun create-distributed-table (distribute-rules) + (let ((citus-sql + (loop :for rule :in distribute-rules + :collect (format-create-sql rule)))) + (pgsql-execute citus-sql))) diff --git a/src/pgsql/pgsql-schema.lisp b/src/pgsql/pgsql-schema.lisp index 72da2ac..59f33f0 100644 --- a/src/pgsql/pgsql-schema.lisp +++ b/src/pgsql/pgsql-schema.lisp @@ -235,7 +235,7 @@ "Get the list of PostgreSQL index definitions per table." (loop :for (schema-name table-name fschema-name ftable-name - conoid conname condef + conoid pkeyoid conname condef cols fcols updrule delrule mrule deferrable deferred) :in (query nil @@ -277,9 +277,13 @@ (table (find-table schema table-name)) (fschema (find-schema catalog fschema-name)) (ftable (find-table fschema ftable-name)) + (pkey (find pkeyoid (table-index-list ftable) + :test #'= + :key #'index-oid)) (fk (make-fkey :name (ensure-quoted conname) :oid conoid + :pkey pkey :condef condef :table table :columns (split-sequence:split-sequence #\, cols) @@ -290,6 +294,13 @@ :match-rule (pg-fk-match-rule-to-match-clause mrule) :deferrable deferrable :initially-deferred deferred))) + ;; add the fkey reference to the pkey index too + (unless (find conoid + (index-fk-deps pkey) + :test #'= + :key #'fkey-oid) + (push-to-end fk (index-fk-deps pkey))) + ;; check that both tables are in pgloader's scope (if (and table ftable) (add-fkey table fk) (log-message :notice "Foreign Key ~a is ignored, one of its table is missing from pgloader table selection" diff --git a/src/pgsql/sql/list-all-fkeys.sql b/src/pgsql/sql/list-all-fkeys.sql index 8ebe8b5..bc666d1 100644 --- a/src/pgsql/sql/list-all-fkeys.sql +++ b/src/pgsql/sql/list-all-fkeys.sql @@ -7,7 +7,9 @@ -- excluding (ftable) -- filter-list-to-where-clause for excluding select n.nspname, c.relname, nf.nspname, cf.relname as frelname, - r.oid, conname, + r.oid, + d.refobjid as pkeyoid, + conname, pg_catalog.pg_get_constraintdef(r.oid, true) as condef, (select string_agg(attname, ',') from pg_attribute @@ -26,6 +28,9 @@ JOIN pg_namespace n on c.relnamespace = n.oid JOIN pg_class cf on r.confrelid = cf.oid JOIN pg_namespace nf on cf.relnamespace = nf.oid + JOIN pg_depend d on d.classid = 'pg_constraint'::regclass + and d.objid = r.oid + and d.refobjsubid = 0 where r.contype = 'f' AND c.relkind in ('r', 'f', 'p') AND cf.relkind in ('r', 'f', 'p') diff --git a/src/sources/pgsql/pgsql.lisp b/src/sources/pgsql/pgsql.lisp index 8a45a58..da6d611 100644 --- a/src/sources/pgsql/pgsql.lisp +++ b/src/sources/pgsql/pgsql.lisp @@ -41,12 +41,30 @@ (funcall process-row-fn row))))))) (with-pgsql-connection ((source-db pgsql)) - (let* ((cols (mapcar #'column-name (fields pgsql))) - (sql - (format nil "SELECT ~{~s::text~^, ~} FROM ~s.~s" cols - (schema-source-name (table-schema (source pgsql))) - (table-source-name (source pgsql))))) - (cl-postgres:exec-query pomo:*database* sql map-reader))))) + (if (citus-backfill-table-p (target pgsql)) + ;; + ;; SELECT dist_key, * FROM source JOIN dist ON ... + ;; + (let ((sql (citus-format-sql-select (source pgsql) (target pgsql)))) + (log-message :sql "~a" sql) + (cl-postgres:exec-query pomo:*database* sql map-reader)) + + ;; + ;; No JOIN to add to backfill data in the SQL query here. + ;; + (let* ((cols (mapcar #'column-name (fields pgsql))) + (sql + (format nil + "SELECT ~{~s::text~^, ~} FROM ~s.~s" + cols + (schema-source-name (table-schema (source pgsql))) + (table-source-name (source pgsql))))) + (log-message :sql "~a" sql) + (cl-postgres:exec-query pomo:*database* sql map-reader)))))) + +(defmethod copy-column-list ((pgsql copy-pgsql)) + "We are sending the data in the MySQL columns ordering here." + (mapcar #'column-name (fields pgsql))) (defmethod fetch-metadata ((pgsql copy-pgsql) (catalog catalog) diff --git a/src/utils/catalog.lisp b/src/utils/catalog.lisp index 46ddbc6..6b29aad 100644 --- a/src/utils/catalog.lisp +++ b/src/utils/catalog.lisp @@ -78,7 +78,7 @@ ;;; Index and Foreign Keys ;;; (defstruct fkey - name oid table columns foreign-table foreign-columns condef + name oid table columns pkey foreign-table foreign-columns condef update-rule delete-rule match-rule deferrable initially-deferred) ;;; diff --git a/src/utils/citus.lisp b/src/utils/citus.lisp index b080afb..0cdcc9b 100644 --- a/src/utils/citus.lisp +++ b/src/utils/citus.lisp @@ -48,42 +48,154 @@ (defmethod apply-citus-rule ((rule citus-distributed-table) (table table)) (setf (table-citus-rule table) rule) + ;; + ;; Replace the TABLE placeholders in the :FROM slot of the rule with the + ;; tables from the catalogs. + ;; + (when (citus-distributed-table-from rule) + (let ((catalog (schema-catalog (table-schema table)))) + (map-into (citus-distributed-table-from rule) + (lambda (from) (citus-find-table catalog from)) + (citus-distributed-table-from rule)))) + ;; ok now we need to check if the USING column exists or if we need to add ;; it to our model (let ((column (find (column-name (citus-distributed-table-using rule)) (table-field-list table) :test #'string= :key #'column-name))) - (assert (not (null column))) - (if column ;; add it to the PKEY definition, in first position - (let* ((index (find-if #'index-primary (table-index-list table))) - (idxcol (find (column-name (citus-distributed-table-using rule)) - (index-columns index) - :test #'string=))) - (assert (not (null index))) - (unless idxcol - ;; add a new column - (push (column-name (citus-distributed-table-using rule)) - (index-columns index)) - ;; now remove origin schema sql and condef, we need to redo them - (setf (index-sql index) nil) - (setf (index-condef index) nil))) + (add-column-to-pkey table + (column-name (citus-distributed-table-using rule))) - ;; the column doesn't exist, we need to find it in the :FROM rule - (let* ((from-table - (citus-find-table (schema-catalog (table-schema table)) - (citus-distributed-table-from rule))) + ;; The column doesn't exist, we need to find it in the :FROM rule's + ;; list. The :FROM slot of the rule is a list of tables to + ;; "traverse" when backfilling the data. The list follows the + ;; foreign-key relationships from TABLE to the source of the + ;; distribution key. + ;; + ;; To find the column definition to add to the current TABLE, look + ;; it up in the last entry of the FROM rule's list. + (let* ((last-from-rule (car (last (citus-distributed-table-from rule)))) (column-definition (find (column-name (citus-distributed-table-using rule)) - (table-field-list from-table) + (table-field-list last-from-rule) :test #'string= - :key #'column-name))) - (assert (not (null from-table))) - (push (make-column :name (column-name column-definition) + :key #'column-name)) + (new-column + (make-column :name (column-name column-definition) :type-name (column-type-name column-definition) :nullable (column-nullable column-definition) - :transform (column-transform column-definition)) - (table-column-list table)))))) + :transform (column-transform column-definition)))) + ;; + ;; Here also we need to add the new column to the PKEY definition, + ;; in first position. + ;; + (add-column-to-pkey table (column-name new-column)) + + ;; + ;; We need to backfill the distribution key in the data, which + ;; we're implementing with a JOIN when we SELECT from the source + ;; table. We add the new field here. + ;; + (push new-column (table-field-list table)) + (push new-column (table-column-list table)))))) + + +(defun add-column-to-pkey (table column-name) + "Add COLUMN in the first position of the TABLE's primary key index." + (let* ((index (find-if #'index-primary (table-index-list table))) + (idxcol (find column-name (index-columns index) :test #'string=))) + (assert (not (null index))) + (unless idxcol + ;; add a new column + (push column-name (index-columns index)) + ;; now remove origin schema sql and condef, we need to redo them + (setf (index-sql index) nil) + (setf (index-condef index) nil) + + ;; now tweak the fkey definitions that are using this index + (loop :for fkey :in (index-fk-deps index) + :do (push column-name (fkey-columns fkey)) + :do (push column-name (fkey-foreign-columns fkey)) + :do (setf (fkey-condef fkey) nil))))) + + +(defun format-citus-join-clause (table distribution-rule) + "Format a JOIN clause to backfill the distribution key data in tables that + are referencing (even indirectly) the main distribution table." + (with-output-to-string (s) + (loop :for current-table := table :then rel + :for rel :in (citus-distributed-table-from distribution-rule) + :do (let* ((fkey + (find (ensure-unquoted (table-name rel)) + (table-fkey-list current-table) + :test #'string= + :key (lambda (fkey) + (ensure-unquoted + (table-name (fkey-foreign-table fkey)))))) + (ftable (fkey-foreign-table fkey))) + (format s + " JOIN ~s.~s" + (schema-source-name (table-schema ftable)) + (table-source-name ftable)) + ;; + ;; Skip the first column in the fkey definition, that's the + ;; distribution key that was just added by pgloader: we don't + ;; have it on the source database, we are going to create it on + ;; the target database. + ;; + (loop :for first := t :then nil + :for c :in (cdr (fkey-columns fkey)) + :for fc :in (cdr (fkey-foreign-columns fkey)) + :do (format s + " ~:[AND~;ON~] ~a.~a = ~a.~a" + first + (table-source-name (fkey-table fkey)) + c + (table-source-name (fkey-foreign-table fkey)) + fc)))))) + +(defun citus-format-sql-select (source-table target-table) + "Return the SQL statement to use to fetch data from the COPY context, + including backfilling the distribution key in related tables." + + ;; + ;; SELECT from.id, id, ... from source join from-table ... + ;; + ;; So we must be careful to prefix the column names with the + ;; proper table name, because of the join(s), and the first column + ;; in the output is taken from the main FROM table (the last one + ;; in the rule). + ;; + (let* ((last-from-rule + (car (last (citus-distributed-table-from + (table-citus-rule target-table))))) + (cols + (append (list + (format nil "~a.~a" + (table-name last-from-rule) + (column-name (first (table-field-list source-table))))) + (mapcar (lambda (field) + (format nil "~a.~a" + (table-name source-table) + (column-name field))) + (rest (table-field-list source-table))))) + (joins + (format-citus-join-clause source-table + (table-citus-rule target-table)))) + (format nil + "SELECT ~{~a::text~^, ~} FROM ~s.~s ~a" + cols + (schema-source-name (table-schema source-table)) + (table-source-name source-table) + joins))) + +(defun citus-backfill-table-p (table) + "Returns non-nil when given TABLE should be backfilled with the + distribution key." + (and (table-citus-rule table) + (typep (table-citus-rule table) 'citus-distributed-table) + (not (null (citus-distributed-table-from (table-citus-rule table))))))