Improve redshift support: string_agg() doesn't exist there.

Neither does array_agg(), unnest() and other very useful PostgreSQL
functions. Redshift is from 8.0 times, so do things the old way: parse the
output of the index definition that get from calling pg_index_def().

For that, this patch introduces the notion of SQL support that depends on
PostgreSQL major version. If no major-version specific query is found in the
pgloader source tree, then we use the generic one.

Fixes #860.
This commit is contained in:
Dimitri Fontaine 2018-11-07 21:05:59 +01:00
parent 207cd82726
commit 794bc7fc64
8 changed files with 102 additions and 12 deletions

View File

@ -44,7 +44,8 @@
(setf pgsql-catalog
(fetch-pgsql-catalog (db-name pgconn)
:table (target copy)
:variant (pgconn-variant pgconn)))
:variant (pgconn-variant pgconn)
:pgversion (pgconn-major-version pgconn)))
;; if the user didn't tell us the column list of the table, now is
;; a proper time to set it in the copy object

View File

@ -70,9 +70,11 @@
;;
;; to be able to do that properly, get the constraints from
;; the pre-existing target database catalog
(let ((pgsql-catalog
(let* ((pgversion (pgconn-major-version (target-db copy)))
(pgsql-catalog
(fetch-pgsql-catalog (db-name (target-db copy))
:source-catalog catalog)))
:source-catalog catalog
:pgversion pgversion)))
(merge-catalogs catalog pgsql-catalog))
;; now the foreign keys and only then the indexes, because a

View File

@ -287,7 +287,8 @@
(defpackage #:pgloader.queries
(:use #:cl #:pgloader.params)
(:export #:*queries*
#:sql))
#:sql
#:sql-url-for-variant))
(defpackage #:pgloader.citus
(:use #:cl

View File

@ -10,7 +10,8 @@
source-catalog
including
excluding
(variant :pgdg))
(variant :pgdg)
pgversion)
"Fetch PostgreSQL catalogs for the target database. A PostgreSQL
connection must be opened."
(let* ((*identifier-case* :quote)
@ -35,7 +36,8 @@
(list-all-indexes catalog
:including including
:excluding excluding)
:excluding excluding
:pgversion pgversion)
(when (eq :pgdg variant)
(list-all-fkeys catalog
@ -193,7 +195,7 @@
(add-field table field))
:finally (return catalog)))
(defun list-all-indexes (catalog &key including excluding)
(defun list-all-indexes (catalog &key including excluding pgversion)
"Get the list of PostgreSQL index definitions per table."
(loop
:for (schema-name name oid
@ -201,7 +203,9 @@
primary unique cols sql conname condef)
:in (query nil
(format nil
(sql "/pgsql/list-all-indexes.sql")
(sql (sql-url-for-variant "pgsql"
"list-all-indexes.sql"
pgversion))
including ; do we print the clause?
(filter-list-to-where-clause including
nil
@ -215,6 +219,7 @@
:do (let* ((schema (find-schema catalog schema-name))
(tschema (find-schema catalog table-schema))
(table (find-table tschema table-name))
(columns (parse-index-column-names cols sql))
(pg-index
(make-index :name (ensure-quoted name)
:oid oid
@ -222,7 +227,7 @@
:table table
:primary primary
:unique unique
:columns (split-sequence:split-sequence #\, cols)
:columns columns
:sql sql
:conname (unless (eq :null conname)
(ensure-quoted conname))
@ -438,3 +443,30 @@
;; going to take care of creating the type.
(add-sqltype schema sqltype)))
:finally (return catalog)))
;;;
;;; Extra utils like parsing a list of column names from an index definition.
;;;
(defun parse-index-column-names (columns index-definition)
"Return a list of column names for the given index."
(if (and columns (not (eq :null columns)))
;; the normal case, no much parsing to do, the data has been prepared
;; for us in the SQL query
(split-sequence:split-sequence #\, columns)
;; the redshift variant case, where there's no way to string_agg or
;; even array_to_string(array_agg(...)) and so we need to parse the
;; index-definition instead.
;;
;; CREATE UNIQUE INDEX pg_amproc_opc_proc_index ON pg_amproc USING btree (amopclaid, amprocsubtype, amprocnum)
(when index-definition
(let ((open-paren-pos (position #\( index-definition))
(close-paren-pos (position #\) index-definition)))
(when (and open-paren-pos close-paren-pos)
(mapcar (lambda (colname) (string-trim " " colname))
(split-sequence:split-sequence #\,
index-definition
:start (+ 1 open-paren-pos)
:end close-paren-pos)))))))

View File

@ -0,0 +1,4 @@
Redshift is a fork of PostgreSQL 8.0, and our catalog queries must then
target this old PostgreSQL version to work on Redshift. Parts of what we
would usually implement in SQL is implemented in pgloader code instead, in
order to support such an old PostgreSQL version.

View File

@ -0,0 +1,29 @@
-- params: including
-- filter-list-to-where-clause for including
-- excluding
-- filter-list-to-where-clause for excluding
select n.nspname,
i.relname,
i.oid,
rn.nspname,
r.relname,
indisprimary,
indisunique,
null,
pg_get_indexdef(indexrelid),
c.conname,
pg_get_constraintdef(c.oid)
from pg_index x
join pg_class i ON i.oid = x.indexrelid
join pg_class r ON r.oid = x.indrelid
join pg_namespace n ON n.oid = i.relnamespace
join pg_namespace rn ON rn.oid = r.relnamespace
left join pg_depend d on d.classid = 'pg_class'::regclass
and d.objid = i.oid
and d.refclassid = 'pg_constraint'::regclass
and d.deptype = 'i'
left join pg_constraint c ON c.oid = d.refobjid
where n.nspname !~~ '^pg_' and n.nspname <> 'information_schema'
~:[~*~;and (~{~a~^~&~10t or ~})~]
~:[~*~;and (~{~a~^~&~10t and ~})~]
order by n.nspname, r.relname;

View File

@ -82,7 +82,8 @@
:use-result-as-read t
:section :pre)
(with-pgsql-transaction (:pgconn (source-db pgsql))
(let ((variant (pgconn-variant (source-db pgsql))))
(let ((variant (pgconn-variant (source-db pgsql)))
(pgversion (pgconn-major-version (source-db pgsql))))
(when (eq :pgdg variant)
(list-all-sqltypes catalog
:including including
@ -95,7 +96,8 @@
(when create-indexes
(list-all-indexes catalog
:including including
:excluding excluding))
:excluding excluding
:pgversion pgversion))
(when (and (eq :pgdg variant) foreign-keys)
(list-all-fkeys catalog

View File

@ -66,3 +66,22 @@
(recompute-fs-and-retry ()
(setf *fs* (walk-sources-and-build-fs))
(sql url))))
(defun sql-url-for-variant (base filename &optional variant)
"Build a SQL URL for given VARIANT"
(flet ((sql-base-url (base filename)
(format nil "/~a/~a" base filename)))
(if variant
(let ((sql-variant-url
(format nil "/~a/~a/~a"
base
(string-downcase (typecase variant
(symbol (symbol-name variant))
(string variant)
(t (princ-to-string variant))))
filename)))
(if (gethash sql-variant-url *fs*)
sql-variant-url
(sql-base-url base filename)))
(sql-base-url base filename))))