diff --git a/src/load/migrate-database.lisp b/src/load/migrate-database.lisp index 129ca5b..d99efbf 100644 --- a/src/load/migrate-database.lisp +++ b/src/load/migrate-database.lisp @@ -115,15 +115,21 @@ :use-result-as-rows t) (create-views catalog :include-drop include-drop - :client-min-messages :error))) + :client-min-messages :error)))) - ;; Citus Support - (when distribute + ;; Citus Support + ;; + ;; We need a separate transaction here in some cases, because of the + ;; distributed DDL support from Citus, to avoid the following error: + ;; + ;; ERROR Database error 25001: cannot establish a new connection for + ;; placement 2299, since DDL has been executed on a connection that is in + ;; use + ;; + (when distribute + (with-pgsql-transaction (:pgconn (target-db copy)) (with-stats-collection ("Citus Distribute Tables" :section :pre) - (let ((citus-sql - (loop :for rule :in distribute - :collect (format-create-sql rule)))) - (pgsql-execute citus-sql :client-min-messages :notice))))) + (create-distributed-table distribute)))) ;; log the catalog we just fetched and (maybe) merged (log-message :data "CATALOG: ~s" catalog)) diff --git a/src/package.lisp b/src/package.lisp index 8d32d64..e1e74bf 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -217,6 +217,8 @@ #:citus-distributed-table-table #:citus-distributed-table-using #:citus-distributed-table-from + #:citus-format-sql-select + #:citus-backfill-table-p #:format-table-name)) @@ -433,6 +435,8 @@ #:reset-sequences #:comment-on-tables-and-columns + #:create-distributed-table + ;; finalizing catalogs support (redshift and other variants) #:finalize-catalogs #:adjust-data-types diff --git a/src/parsers/command-distribute.lisp b/src/parsers/command-distribute.lisp index 0a642b7..6ae0b66 100644 --- a/src/parsers/command-distribute.lisp +++ b/src/parsers/command-distribute.lisp @@ -33,13 +33,38 @@ (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u) :using (make-column :name (fourth d-u))))) +;;; +;;; The namestring rule allows for commas and we use them as a separator +;;; here, so we need to have our own table name parsing. That's a bummer, +;;; maybe we should revisit the whole table names parsing code? +;;; +(defrule distribute-from-tablename + (or double-quoted-namestring + quoted-namestring + (and (or #\_ (alpha-char-p character)) + (* (or (alpha-char-p character) + (digit-char-p character))))) + (:text t)) + +(defrule maybe-qualified-dist-from-table-name + (and distribute-from-tablename (? (and "." distribute-from-tablename))) + (:lambda (name) + (if (second name) + (cons (first name) (second (second name))) + (cons "public" (first name))))) + +(defrule distribute-from-list (+ (and maybe-qualified-dist-from-table-name + (? (and "," ignore-whitespace)))) + (:lambda (from-list) + (mapcar #'first from-list))) + (defrule distribute-using-from (and kw-distribute dsn-table-name kw-using maybe-quoted-namestring - kw-from (+ maybe-quoted-namestring)) + kw-from distribute-from-list) (:lambda (d-u-f) (make-citus-distributed-table :table (create-table-from-dsn-table-name d-u-f) :using (make-column :name (fourth d-u-f)) - :from (apply #'create-table (sixth d-u-f))))) + :from (mapcar #'create-table (sixth d-u-f))))) (defrule distribute-commands (+ (or distribute-using-from distribute-using diff --git a/src/pgsql/pgsql-create-schema.lisp b/src/pgsql/pgsql-create-schema.lisp index e6154e5..b06c31d 100644 --- a/src/pgsql/pgsql-create-schema.lisp +++ b/src/pgsql/pgsql-create-schema.lisp @@ -465,3 +465,14 @@ $$; " tables))) (column-name column) quote (column-comment column) quote))))) (pgsql-execute-with-timing section label sql-list))) + + + +;;; +;;; Citus Disitribution support +;;; +(defun create-distributed-table (distribute-rules) + (let ((citus-sql + (loop :for rule :in distribute-rules + :collect (format-create-sql rule)))) + (pgsql-execute citus-sql))) diff --git a/src/pgsql/pgsql-schema.lisp b/src/pgsql/pgsql-schema.lisp index 72da2ac..59f33f0 100644 --- a/src/pgsql/pgsql-schema.lisp +++ b/src/pgsql/pgsql-schema.lisp @@ -235,7 +235,7 @@ "Get the list of PostgreSQL index definitions per table." (loop :for (schema-name table-name fschema-name ftable-name - conoid conname condef + conoid pkeyoid conname condef cols fcols updrule delrule mrule deferrable deferred) :in (query nil @@ -277,9 +277,13 @@ (table (find-table schema table-name)) (fschema (find-schema catalog fschema-name)) (ftable (find-table fschema ftable-name)) + (pkey (find pkeyoid (table-index-list ftable) + :test #'= + :key #'index-oid)) (fk (make-fkey :name (ensure-quoted conname) :oid conoid + :pkey pkey :condef condef :table table :columns (split-sequence:split-sequence #\, cols) @@ -290,6 +294,13 @@ :match-rule (pg-fk-match-rule-to-match-clause mrule) :deferrable deferrable :initially-deferred deferred))) + ;; add the fkey reference to the pkey index too + (unless (find conoid + (index-fk-deps pkey) + :test #'= + :key #'fkey-oid) + (push-to-end fk (index-fk-deps pkey))) + ;; check that both tables are in pgloader's scope (if (and table ftable) (add-fkey table fk) (log-message :notice "Foreign Key ~a is ignored, one of its table is missing from pgloader table selection" diff --git a/src/pgsql/sql/list-all-fkeys.sql b/src/pgsql/sql/list-all-fkeys.sql index 8ebe8b5..bc666d1 100644 --- a/src/pgsql/sql/list-all-fkeys.sql +++ b/src/pgsql/sql/list-all-fkeys.sql @@ -7,7 +7,9 @@ -- excluding (ftable) -- filter-list-to-where-clause for excluding select n.nspname, c.relname, nf.nspname, cf.relname as frelname, - r.oid, conname, + r.oid, + d.refobjid as pkeyoid, + conname, pg_catalog.pg_get_constraintdef(r.oid, true) as condef, (select string_agg(attname, ',') from pg_attribute @@ -26,6 +28,9 @@ JOIN pg_namespace n on c.relnamespace = n.oid JOIN pg_class cf on r.confrelid = cf.oid JOIN pg_namespace nf on cf.relnamespace = nf.oid + JOIN pg_depend d on d.classid = 'pg_constraint'::regclass + and d.objid = r.oid + and d.refobjsubid = 0 where r.contype = 'f' AND c.relkind in ('r', 'f', 'p') AND cf.relkind in ('r', 'f', 'p') diff --git a/src/sources/pgsql/pgsql.lisp b/src/sources/pgsql/pgsql.lisp index 8a45a58..da6d611 100644 --- a/src/sources/pgsql/pgsql.lisp +++ b/src/sources/pgsql/pgsql.lisp @@ -41,12 +41,30 @@ (funcall process-row-fn row))))))) (with-pgsql-connection ((source-db pgsql)) - (let* ((cols (mapcar #'column-name (fields pgsql))) - (sql - (format nil "SELECT ~{~s::text~^, ~} FROM ~s.~s" cols - (schema-source-name (table-schema (source pgsql))) - (table-source-name (source pgsql))))) - (cl-postgres:exec-query pomo:*database* sql map-reader))))) + (if (citus-backfill-table-p (target pgsql)) + ;; + ;; SELECT dist_key, * FROM source JOIN dist ON ... + ;; + (let ((sql (citus-format-sql-select (source pgsql) (target pgsql)))) + (log-message :sql "~a" sql) + (cl-postgres:exec-query pomo:*database* sql map-reader)) + + ;; + ;; No JOIN to add to backfill data in the SQL query here. + ;; + (let* ((cols (mapcar #'column-name (fields pgsql))) + (sql + (format nil + "SELECT ~{~s::text~^, ~} FROM ~s.~s" + cols + (schema-source-name (table-schema (source pgsql))) + (table-source-name (source pgsql))))) + (log-message :sql "~a" sql) + (cl-postgres:exec-query pomo:*database* sql map-reader)))))) + +(defmethod copy-column-list ((pgsql copy-pgsql)) + "We are sending the data in the MySQL columns ordering here." + (mapcar #'column-name (fields pgsql))) (defmethod fetch-metadata ((pgsql copy-pgsql) (catalog catalog) diff --git a/src/utils/catalog.lisp b/src/utils/catalog.lisp index 46ddbc6..6b29aad 100644 --- a/src/utils/catalog.lisp +++ b/src/utils/catalog.lisp @@ -78,7 +78,7 @@ ;;; Index and Foreign Keys ;;; (defstruct fkey - name oid table columns foreign-table foreign-columns condef + name oid table columns pkey foreign-table foreign-columns condef update-rule delete-rule match-rule deferrable initially-deferred) ;;; diff --git a/src/utils/citus.lisp b/src/utils/citus.lisp index b080afb..0cdcc9b 100644 --- a/src/utils/citus.lisp +++ b/src/utils/citus.lisp @@ -48,42 +48,154 @@ (defmethod apply-citus-rule ((rule citus-distributed-table) (table table)) (setf (table-citus-rule table) rule) + ;; + ;; Replace the TABLE placeholders in the :FROM slot of the rule with the + ;; tables from the catalogs. + ;; + (when (citus-distributed-table-from rule) + (let ((catalog (schema-catalog (table-schema table)))) + (map-into (citus-distributed-table-from rule) + (lambda (from) (citus-find-table catalog from)) + (citus-distributed-table-from rule)))) + ;; ok now we need to check if the USING column exists or if we need to add ;; it to our model (let ((column (find (column-name (citus-distributed-table-using rule)) (table-field-list table) :test #'string= :key #'column-name))) - (assert (not (null column))) - (if column ;; add it to the PKEY definition, in first position - (let* ((index (find-if #'index-primary (table-index-list table))) - (idxcol (find (column-name (citus-distributed-table-using rule)) - (index-columns index) - :test #'string=))) - (assert (not (null index))) - (unless idxcol - ;; add a new column - (push (column-name (citus-distributed-table-using rule)) - (index-columns index)) - ;; now remove origin schema sql and condef, we need to redo them - (setf (index-sql index) nil) - (setf (index-condef index) nil))) + (add-column-to-pkey table + (column-name (citus-distributed-table-using rule))) - ;; the column doesn't exist, we need to find it in the :FROM rule - (let* ((from-table - (citus-find-table (schema-catalog (table-schema table)) - (citus-distributed-table-from rule))) + ;; The column doesn't exist, we need to find it in the :FROM rule's + ;; list. The :FROM slot of the rule is a list of tables to + ;; "traverse" when backfilling the data. The list follows the + ;; foreign-key relationships from TABLE to the source of the + ;; distribution key. + ;; + ;; To find the column definition to add to the current TABLE, look + ;; it up in the last entry of the FROM rule's list. + (let* ((last-from-rule (car (last (citus-distributed-table-from rule)))) (column-definition (find (column-name (citus-distributed-table-using rule)) - (table-field-list from-table) + (table-field-list last-from-rule) :test #'string= - :key #'column-name))) - (assert (not (null from-table))) - (push (make-column :name (column-name column-definition) + :key #'column-name)) + (new-column + (make-column :name (column-name column-definition) :type-name (column-type-name column-definition) :nullable (column-nullable column-definition) - :transform (column-transform column-definition)) - (table-column-list table)))))) + :transform (column-transform column-definition)))) + ;; + ;; Here also we need to add the new column to the PKEY definition, + ;; in first position. + ;; + (add-column-to-pkey table (column-name new-column)) + + ;; + ;; We need to backfill the distribution key in the data, which + ;; we're implementing with a JOIN when we SELECT from the source + ;; table. We add the new field here. + ;; + (push new-column (table-field-list table)) + (push new-column (table-column-list table)))))) + + +(defun add-column-to-pkey (table column-name) + "Add COLUMN in the first position of the TABLE's primary key index." + (let* ((index (find-if #'index-primary (table-index-list table))) + (idxcol (find column-name (index-columns index) :test #'string=))) + (assert (not (null index))) + (unless idxcol + ;; add a new column + (push column-name (index-columns index)) + ;; now remove origin schema sql and condef, we need to redo them + (setf (index-sql index) nil) + (setf (index-condef index) nil) + + ;; now tweak the fkey definitions that are using this index + (loop :for fkey :in (index-fk-deps index) + :do (push column-name (fkey-columns fkey)) + :do (push column-name (fkey-foreign-columns fkey)) + :do (setf (fkey-condef fkey) nil))))) + + +(defun format-citus-join-clause (table distribution-rule) + "Format a JOIN clause to backfill the distribution key data in tables that + are referencing (even indirectly) the main distribution table." + (with-output-to-string (s) + (loop :for current-table := table :then rel + :for rel :in (citus-distributed-table-from distribution-rule) + :do (let* ((fkey + (find (ensure-unquoted (table-name rel)) + (table-fkey-list current-table) + :test #'string= + :key (lambda (fkey) + (ensure-unquoted + (table-name (fkey-foreign-table fkey)))))) + (ftable (fkey-foreign-table fkey))) + (format s + " JOIN ~s.~s" + (schema-source-name (table-schema ftable)) + (table-source-name ftable)) + ;; + ;; Skip the first column in the fkey definition, that's the + ;; distribution key that was just added by pgloader: we don't + ;; have it on the source database, we are going to create it on + ;; the target database. + ;; + (loop :for first := t :then nil + :for c :in (cdr (fkey-columns fkey)) + :for fc :in (cdr (fkey-foreign-columns fkey)) + :do (format s + " ~:[AND~;ON~] ~a.~a = ~a.~a" + first + (table-source-name (fkey-table fkey)) + c + (table-source-name (fkey-foreign-table fkey)) + fc)))))) + +(defun citus-format-sql-select (source-table target-table) + "Return the SQL statement to use to fetch data from the COPY context, + including backfilling the distribution key in related tables." + + ;; + ;; SELECT from.id, id, ... from source join from-table ... + ;; + ;; So we must be careful to prefix the column names with the + ;; proper table name, because of the join(s), and the first column + ;; in the output is taken from the main FROM table (the last one + ;; in the rule). + ;; + (let* ((last-from-rule + (car (last (citus-distributed-table-from + (table-citus-rule target-table))))) + (cols + (append (list + (format nil "~a.~a" + (table-name last-from-rule) + (column-name (first (table-field-list source-table))))) + (mapcar (lambda (field) + (format nil "~a.~a" + (table-name source-table) + (column-name field))) + (rest (table-field-list source-table))))) + (joins + (format-citus-join-clause source-table + (table-citus-rule target-table)))) + (format nil + "SELECT ~{~a::text~^, ~} FROM ~s.~s ~a" + cols + (schema-source-name (table-schema source-table)) + (table-source-name source-table) + joins))) + +(defun citus-backfill-table-p (table) + "Returns non-nil when given TABLE should be backfilled with the + distribution key." + (and (table-citus-rule table) + (typep (table-citus-rule table) 'citus-distributed-table) + (not (null (citus-distributed-table-from (table-citus-rule table))))))