Implement support for existing target databases.

Also known as the ORM case, it happens that other tools are used to
create the target schema. In that case pgloader job is to fill in the
exiting target tables with the data from the source tables.

We still focus on load speed and pgloader will now DROP the
constraints (Primary Key, Unique, Foreign Keys) and indexes before
running the COPY statements, and re-install the schema it found in the
target database once the data load is done.

This behavior is activated when using the “create no tables” option as
in the following test-case setup:

  with create no tables, include drop, truncate

Fixes #400, for which I got a test-case to play with!
This commit is contained in:
Dimitri Fontaine 2016-08-06 20:07:49 +02:00
parent 2d47c4f0f5
commit 70572a2ea7
20 changed files with 677 additions and 477 deletions

View File

@ -1,7 +1,7 @@
.\" generated with Ronn/v0.7.3
.\" http://github.com/rtomayko/ronn/tree/0.7.3
.
.TH "PGLOADER" "1" "May 2016" "ff" ""
.TH "PGLOADER" "1" "August 2016" "ff" ""
.
.SH "NAME"
\fBpgloader\fR \- PostgreSQL data loader
@ -1876,7 +1876,10 @@ When this option is listed, pgloader creates the table using the meta data found
\fIcreate no tables\fR
.
.IP
When this option is listed, pgloader skips the creation of table before lading data, target tables must then already exist\.
When this option is listed, pgloader skips the creation of table before loading data, target tables must then already exist\.
.
.IP
Also, when using \fIcreate no tables\fR pgloader fetches the metadata from the current target database and checks type casting, then will remove constraints and indexes prior to loading the data and install them back again once the loading is done\.
.
.IP "\(bu" 4
\fIcreate indexes\fR
@ -2063,7 +2066,7 @@ When the option \fIdrop default\fR is listed, pgloader drops any existing defaul
The spelling \fIkeep default\fR explicitly prevents that behaviour and can be used to overload the default casting rules\.
.
.IP "\(bu" 4
\fIdrop not null\fR, \fIkeep not null\fR
\fIdrop not null\fR, \fIkeep not null\fR, \fIset not null\fR
.
.IP
When the option \fIdrop not null\fR is listed, pgloader drops any existing \fBNOT NULL\fR constraint associated with the given source MySQL datatype when it creates the tables in the PostgreSQL database\.
@ -2071,6 +2074,9 @@ When the option \fIdrop not null\fR is listed, pgloader drops any existing \fBNO
.IP
The spelling \fIkeep not null\fR explicitly prevents that behaviour and can be used to overload the default casting rules\.
.
.IP
When the option \fIset not null\fR is listed, pgloader sets a \fBNOT NULL\fR constraint on the target column regardless whether it has been set in the source MySQL column\.
.
.IP "\(bu" 4
\fIdrop typemod\fR, \fIkeep typemod\fR
.
@ -2302,10 +2308,10 @@ type decimal to decimal keep typemod
Texts:
.
.IP "\(bu" 4
type char to varchar keep typemod using remove\-null\-characters
type char to char keep typemod using remove\-null\-characters
.
.IP "\(bu" 4
type varchar to text using remove\-null\-characters
type varchar to varchar keep typemod using remove\-null\-characters
.
.IP "\(bu" 4
type tinytext to text using remove\-null\-characters
@ -2476,7 +2482,10 @@ When this option is listed, pgloader creates the table using the meta data found
\fIcreate no tables\fR
.
.IP
When this option is listed, pgloader skips the creation of table before lading data, target tables must then already exist\.
When this option is listed, pgloader skips the creation of table before loading data, target tables must then already exist\.
.
.IP
Also, when using \fIcreate no tables\fR pgloader fetches the metadata from the current target database and checks type casting, then will remove constraints and indexes prior to loading the data and install them back again once the loading is done\.
.
.IP "\(bu" 4
\fIcreate indexes\fR

View File

@ -1616,7 +1616,12 @@ The `database` command accepts the following clauses and options:
- *create no tables*
When this option is listed, pgloader skips the creation of table
before lading data, target tables must then already exist.
before loading data, target tables must then already exist.
Also, when using *create no tables* pgloader fetches the metadata
from the current target database and checks type casting, then will
remove constraints and indexes prior to loading the data and install
them back again once the loading is done.
- *create indexes*
@ -2079,7 +2084,12 @@ The `sqlite` command accepts the following clauses and options:
- *create no tables*
When this option is listed, pgloader skips the creation of table
before lading data, target tables must then already exist.
before loading data, target tables must then already exist.
Also, when using *create no tables* pgloader fetches the metadata
from the current target database and checks type casting, then will
remove constraints and indexes prior to loading the data and install
them back again once the loading is done.
- *create indexes*

View File

@ -90,16 +90,17 @@
(:file "connection")
(:file "pgsql-ddl")
(:file "pgsql-schema")
(:file "merge-catalogs" :depends-on ("pgsql-schema"))
(:file "pgsql-trigger")
(:file "pgsql-index-filter")
(:file "queries")
(:file "schema" :depends-on ("pgsql-trigger"))
(:file "pgsql-create-schema" :depends-on ("pgsql-trigger"))
(:file "retry-batch")
(:file "copy-from-queue"
:depends-on ("copy-format"
"connection"
"retry-batch"
"queries"
"schema"))))
"pgsql-create-schema"
"pgsql-schema"))))
(:module "parsers"
:depends-on ("params" "package" "utils" "pgsql" "monkey")
@ -210,13 +211,18 @@
:depends-on ("mysql-cast-rules"
"mysql-schema"))))))
(:module "regress"
:depends-on ("params" "package" "utils" "pgsql")
:components ((:file "regress")))
;; the main entry file, used when building a stand-alone
;; executable image
(:file "main" :depends-on ("params"
"package"
"utils"
"parsers"
"sources"))))
"sources"
"regress"))))
;; to produce the website
(:module "web"

View File

@ -1,14 +1,5 @@
(in-package #:pgloader)
;;;
;;; Some command line constants for OS errors codes
;;;
(defparameter +os-code-success+ 0)
(defparameter +os-code-error+ 1)
(defparameter +os-code-error-usage+ 2)
(defparameter +os-code-error-bad-source+ 4)
(defparameter +os-code-error-regress+ 5)
;;;
;;; Now some tooling
;;;
@ -421,104 +412,6 @@
:after (parse-sql-file after)
:start-logger nil))))
(defun process-regression-test (load-file &key start-logger)
"Run a regression test for given LOAD-FILE."
(unless (probe-file load-file)
(format t "Regression testing ~s: file does not exists." load-file)
(uiop:quit +os-code-error-regress+))
;; now do our work
(with-monitor (:start-logger start-logger)
(log-message :log "Regression testing: ~s" load-file)
(process-command-file load-file)
;; once we are done running the load-file, compare the loaded data with
;; our expected data file
(bind ((expected-subdir (directory-namestring
(asdf:system-relative-pathname
:pgloader "test/regress/expected/")))
(expected-data-file (make-pathname :defaults load-file
:type "out"
:directory expected-subdir))
((target-conn gucs) (parse-target-pg-db-uri load-file))
(*pg-settings* (pgloader.pgsql:sanitize-user-gucs gucs))
(*pgsql-reserved-keywords* (list-reserved-keywords target-conn))
(target-table (create-table (pgconn-table-name target-conn)))
(expected-data-source
(parse-source-string-for-type
:copy (uiop:native-namestring expected-data-file)))
;; change target table-name schema
(expected-data-target
(let ((e-d-t (clone-connection target-conn)))
(setf (pgconn-table-name e-d-t)
;;
;; The connection facility still works with cons here,
;; rather than table structure instances, because of
;; depedencies as explained in
;; src/parsers/command-db-uri.lisp
;;
(cons "expected" (table-name target-table)))
e-d-t)))
(log-message :log "Comparing loaded data against ~s" expected-data-file)
;; prepare expected table in "expected" schema
(with-pgsql-connection (target-conn)
(with-schema (unqualified-table-name target-table)
(let* ((tname (apply-identifier-case unqualified-table-name))
(drop (format nil "drop table if exists expected.~a;"
tname))
(create (format nil "create table expected.~a(like ~a);"
tname tname)))
(log-message :notice "~a" drop)
(pomo:query drop)
(log-message :notice "~a" create)
(pomo:query create))))
;; load expected data
(load-data :from expected-data-source
:into expected-data-target
:options '(:truncate t)
:start-logger nil)
;; now compare both
(with-pgsql-connection (target-conn)
(with-schema (unqualified-table-name target-table)
(let* ((tname (apply-identifier-case unqualified-table-name))
(cols (loop :for (name type)
:in (list-columns-query tname)
;;
;; We can't just use table names here, because
;; PostgreSQL support for the POINT datatype fails
;; to implement EXCEPT support, and the query then
;; fails with:
;;
;; could not identify an equality operator for type point
;;
:collect (if (string= "point" type)
(format nil "~s::text" name)
(format nil "~s" name))))
(sql (format nil
"select count(*) from (select ~{~a~^, ~} from expected.~a except select ~{~a~^, ~} from ~a) ss"
cols
tname
cols
tname))
(diff-count (pomo:query sql :single)))
(log-message :notice "~a" sql)
(log-message :notice "Got a diff of ~a rows" diff-count)
(if (= 0 diff-count)
(progn
(log-message :log "Regress pass.")
#-pgloader-image (values diff-count +os-code-success+)
#+pgloader-image (uiop:quit +os-code-success+))
(progn
(log-message :log "Regress fail.")
#-pgloader-image (values diff-count +os-code-error-regress+)
#+pgloader-image (uiop:quit +os-code-error-regress+)))))))))
;;;
;;; Helper function to run a given command

View File

@ -329,7 +329,7 @@
#:truncate-tables
#:copy-from-file
#:copy-from-queue
#:reset-all-sequences
#:set-table-oids
#:create-sqltypes
#:create-schemas
@ -343,6 +343,8 @@
#:process-index-definitions
#:fetch-pgsql-catalog
#:merge-catalogs
#:create-indexes-in-kernel
#:drop-indexes
#:maybe-drop-indexes

View File

@ -25,7 +25,13 @@
#:*pg-settings*
#:*default-tmpdir*
#:init-params-from-environment
#:getenv-default))
#:getenv-default
#:+os-code-success+
#:+os-code-error+
#:+os-code-error-usage+
#:+os-code-error-bad-source+
#:+os-code-error-regress+))
(in-package :pgloader.params)
@ -146,3 +152,13 @@
(setf *default-tmpdir*
(fad:pathname-as-directory
(getenv-default "TMPDIR" *default-tmpdir*))))
;;;
;;; Some command line constants for OS errors codes
;;;
(defparameter +os-code-success+ 0)
(defparameter +os-code-error+ 1)
(defparameter +os-code-error-usage+ 2)
(defparameter +os-code-error-bad-source+ 4)
(defparameter +os-code-error-regress+ 5)

View File

@ -0,0 +1,143 @@
;;;
;;; To support pre-existing database targets we need to be able to merge the
;;; catalog we get from the source database and the catalog we fetch in the
;;; already prepared target database: that's the typical scenario when using
;;; an ORM to define the schema.
;;;
;;; Using an ORM to define a database schema is considered very bad practice
;;; for very good reasons. pgloader goal is to facilitate migrations to
;;; PostgreSQL, the education is generally welcome in later steps.
;;;
(in-package :pgloader.pgsql)
(defun merge-catalogs (source-catalog target-catalog)
"In order for the data loading to be as fast as possible, we DROP the
constraints and indexes on the target table. Once the data is loaded we
want to install the same constraints as found pre-existing in the
TARGET-CATALOG rather than the one we casted from the SOURCE-CATALOG.
Also, we want to recheck the cast situation and the selected
transformation functions of each column."
(log-message :log "MERGE CATALOGS!")
(let (skip-list)
(loop :for source-schema :in (catalog-schema-list source-catalog)
:do (let* ((schema-name
;; MySQL schema map to PostgreSQL databases, so we might
;; have NIL as a schema name here. Find the current
;; PostgreSQL schema instead of NIL.
(or (schema-name source-schema)
(pomo:query "select current_schema()" :single)))
(target-schema
(find-schema target-catalog schema-name)))
(unless target-schema
(error "pgloader failed to find schema ~s in target catalog"
schema-name))
(loop :for source-table :in (schema-table-list source-schema)
:for target-table := (find-table target-schema
(table-name source-table))
:do (if target-table
(progn
;; re-use indexes and fkeys from target-catalog
(setf (table-oid source-table)
(table-oid target-table)
(table-index-list source-table)
(table-index-list target-table)
(table-fkey-list source-table)
(table-fkey-list target-table)
;; special case for triggers: we don't DROP them,
;; we only ALTER TABLE ... DISABLE TRIGGERS then
;; ENABLE them again; so we only have to refrain
;; from installing the source-catalog ones here.
(table-trigger-list source-table)
nil)
(check-table-columns schema-name
source-table
target-table))
;;
;; We failed to find a matching table for the source
;; table, let the user know then remove the table from
;; the source catalogs.
;;
(progn
(log-message :error
"pgloader failed to find target table for source ~s.~s with name ~s in target catalog"
(schema-source-name source-schema)
(table-source-name source-table)
(table-name source-table))
(push-to-end source-table skip-list))))))
(when skip-list
(loop :for table :in skip-list
:do (let ((schema (table-schema table)))
(log-message :log "Skipping ~a" (format-table-name table))
(setf (schema-table-list schema)
(delete table (schema-table-list schema))))))))
(defun check-table-columns (schema-name source-table target-table)
(loop :for source-column :in (table-column-list source-table)
:do (let* ((col-name (ensure-unquoted
(column-name source-column)))
;;
;; Apply PostgreSQL case rules for identifiers: compare
;; UPCASE versions of them...
;;
(target-column (find (string-upcase col-name)
(table-field-list target-table)
:key (lambda (column)
(string-upcase
(column-name column)))
:test #'string=)))
(unless target-column
(error "pgloader failed to find column ~s.~s.~s in target table ~s"
schema-name
(table-source-name source-table)
(column-name source-column)
(format-table-name target-table)))
(unless (same-type-p source-column target-column)
(log-message :warning "Source column ~s.~s.~s is casted to type ~s which is not the same as ~s, the type of current target database column ~s.~s.~s."
schema-name
(table-source-name source-table)
(column-name source-column)
(column-type-name source-column)
(column-type-name target-column)
(schema-name (table-schema target-table))
(table-name target-table)
(column-name target-column))))))
(defvar *type-name-mapping*
'(("serial" "integer")
("bigserial" "bigint")
("char" "character")
("varchar" "character varying")
("timestamp" "timestamp without time zone")
("timestamptz" "timestamp with time zone")
("decimal" "numeric"))
"Alternative spellings for some type names.")
(defun get-type-name (column)
"Support SQLTYPE indirection if needed."
(let ((typname (column-type-name column)))
(etypecase typname
(string typname)
(sqltype (get-column-type-name-from-sqltype column)))))
(defun same-type-p (source-column target-column)
"Evaluate if SOURCE-COLUMN and TARGET-COLUMN selected type names are
similar enough that we may continue with the migration."
(let ((source-type-name (get-type-name source-column))
(target-type-name (column-type-name target-column)))
(or (string= source-type-name target-type-name)
(member target-type-name (cdr (assoc source-type-name *type-name-mapping*
:test #'string=))
:test #'string=))))

View File

@ -3,36 +3,15 @@
;;;
(in-package #:pgloader.pgsql)
;;;
;;; API for Foreign Keys
;;;
(defun drop-pgsql-fkeys (catalog)
"Drop all Foreign Key Definitions given, to prepare for a clean run."
(loop :for table :in (table-list catalog)
:do
(loop :for fkey :in (table-fkey-list table)
:for sql := (format-drop-sql fkey)
:when sql
:do (pgsql-execute sql))))
(defun create-pgsql-fkeys (catalog
&key
(section :post)
(label "Foreign Keys"))
"Actually create the Foreign Key References that where declared in the
MySQL database"
(with-stats-collection (label :section section :use-result-as-rows t)
(loop :for table :in (table-list catalog)
:sum (loop :for fkey :in (table-fkey-list table)
:for sql := (format-create-sql fkey)
:do (pgsql-execute-with-timing section label sql)
:count t))))
;;;
;;; Table schema support
;;;
(defun create-sqltypes (catalog &key if-not-exists include-drop)
(defun create-sqltypes (catalog
&key
if-not-exists
include-drop
(client-min-messages :notice))
"Create the needed data types for given CATALOG."
(let ((sqltype-list))
;; build the sqltype list
@ -46,9 +25,11 @@
;; now create the types
(loop :for sqltype :in sqltype-list
:when include-drop
:do (pgsql-execute (format-drop-sql sqltype :cascade t))
:do (pgsql-execute (format-drop-sql sqltype :cascade t)
:client-min-messages client-min-messages)
:do (pgsql-execute
(format-create-sql sqltype :if-not-exists if-not-exists)))))
(format-create-sql sqltype :if-not-exists if-not-exists)
:client-min-messages client-min-messages))))
(defun create-table-sql-list (table-list
&key
@ -137,17 +118,17 @@
;;; DDL Utilities: TRUNCATE, ENABLE/DISABLE triggers
;;;
(defun truncate-tables (pgconn catalog-or-table)
"Truncate given TABLE-NAME in database DBNAME"
(with-pgsql-transaction (:pgconn pgconn)
(let ((sql
(format nil "TRUNCATE ~{~a~^,~};"
(mapcar #'format-table-name
(etypecase catalog-or-table
(catalog (table-list catalog-or-table))
(schema (table-list catalog-or-table))
(table (list catalog-or-table)))))))
(pgsql-execute sql))))
(defun truncate-tables (catalog-or-table)
"Truncate given TABLE-NAME in database DBNAME. A PostgreSQL connection
must already be active when calling that function."
(let ((sql
(format nil "TRUNCATE ~{~a~^,~};"
(mapcar #'format-table-name
(etypecase catalog-or-table
(catalog (table-list catalog-or-table))
(schema (table-list catalog-or-table))
(table (list catalog-or-table)))))))
(pgsql-execute sql)))
(defun disable-triggers (table-name)
"Disable triggers on TABLE-NAME. Needs to be called with a PostgreSQL
@ -176,6 +157,33 @@
(enable-triggers ,table-name)))
(progn ,@forms)))
;;;
;;; API for Foreign Keys
;;;
(defun drop-pgsql-fkeys (catalog)
"Drop all Foreign Key Definitions given, to prepare for a clean run."
(loop :for table :in (table-list catalog)
:do
(loop :for fkey :in (table-fkey-list table)
:for sql := (format-drop-sql fkey :cascade t)
:when sql
:do (pgsql-execute sql))))
(defun create-pgsql-fkeys (catalog
&key
(section :post)
(label "Foreign Keys"))
"Actually create the Foreign Key References that where declared in the
MySQL database"
(with-stats-collection (label :section section :use-result-as-rows t)
(loop :for table :in (table-list catalog)
:sum (loop :for fkey :in (table-fkey-list table)
:for sql := (format-create-sql fkey)
:do (pgsql-execute-with-timing section label sql)
:count t))))
;;;
;;; Parallel index building.
@ -203,6 +211,7 @@
:when pkey
:collect pkey)))
;;;
;;; Protect from non-unique index names
;;;
@ -214,50 +223,52 @@
This function grabs the table OIDs in the PostgreSQL database and update
the definitions with them."
(let* ((table-names (mapcar #'format-table-name (table-list catalog)))
(table-oids (pgloader.pgsql:list-table-oids table-names)))
(oid-map (list-table-oids table-names)))
(loop :for table :in (table-list catalog)
:for table-name := (format-table-name table)
:for table-oid := (cdr (assoc table-name table-oids :test #'string=))
:for table-oid := (gethash table-name oid-map)
:unless table-oid :do (error "OID not found for ~s." table-name)
:do (setf (table-oid table) table-oid))))
;;;
;;; Drop indexes before loading
;;;
(defun drop-indexes (section table)
"Drop indexes in PGSQL-INDEX-LIST. A PostgreSQL connection must already be
active when calling that function."
(loop :for index :in (table-index-list table)
:do (let ((sql (format-drop-sql index)))
(pgsql-execute-with-timing section "drop indexes" sql))))
(let ((sql-index-list
(loop :for index :in (table-index-list table)
:collect (format-drop-sql index :cascade t))))
(pgsql-execute-with-timing section "drop indexes" sql-index-list)))
;;;
;;; Higher level API to care about indexes
;;;
(defun maybe-drop-indexes (target catalog &key (section :pre) drop-indexes)
(defun maybe-drop-indexes (catalog &key (section :pre) drop-indexes)
"Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and
returns a list of indexes to create again."
(with-pgsql-connection (target)
(loop :for table :in (table-list catalog)
:do
(let ((indexes (table-index-list table))
;; we get the list of indexes from PostgreSQL catalogs, so don't
;; question their spelling, just quote them.
(*identifier-case* :quote))
returns a list of indexes to create again. A PostgreSQL connection must
already be active when calling that function."
(loop :for table :in (table-list catalog)
:do
(let ((indexes (table-index-list table))
;; we get the list of indexes from PostgreSQL catalogs, so don't
;; question their spelling, just quote them.
(*identifier-case* :quote))
(cond ((and indexes (not drop-indexes))
(log-message :warning
"Target table ~s has ~d indexes defined against it."
(format-table-name table) (length indexes))
(log-message :warning
"That could impact loading performance badly.")
(log-message :warning
"Consider the option 'drop indexes'."))
(cond ((and indexes (not drop-indexes))
(log-message :warning
"Target table ~s has ~d indexes defined against it."
(format-table-name table) (length indexes))
(log-message :warning
"That could impact loading performance badly.")
(log-message :warning
"Consider the option 'drop indexes'."))
(indexes
;; drop the indexes now
(with-stats-collection ("drop indexes" :section section)
(drop-indexes section table))))))))
(indexes
;; drop the indexes now
(with-stats-collection ("drop indexes" :section section)
(drop-indexes section table)))))))
(defun create-indexes-again (target catalog
&key
@ -296,13 +307,58 @@
;;;
;;; Sequences
;;;
(defun reset-sequences (catalog &key pgconn (section :post))
(defun reset-sequences (target catalog &key (section :post))
"Reset all sequences created during this MySQL migration."
(log-message :notice "Reset sequences")
(with-stats-collection ("Reset Sequences"
:use-result-as-rows t
:section section)
(reset-all-sequences pgconn :tables (table-list catalog))))
(let ((tables (table-list catalog)))
(with-pgsql-connection (target)
(set-session-gucs *pg-settings*)
(pomo:execute "set client_min_messages to warning;")
(pomo:execute "listen seqs")
(when tables
(pomo:execute
(format nil "create temp table reloids(oid) as values ~{('~a'::regclass)~^,~}"
(mapcar #'format-table-name tables))))
(handler-case
(let ((sql (format nil "
DO $$
DECLARE
n integer := 0;
r record;
BEGIN
FOR r in
SELECT 'select '
|| trim(trailing ')'
from replace(pg_get_expr(d.adbin, d.adrelid),
'nextval', 'setval'))
|| ', (select greatest(max(' || quote_ident(a.attname) || '), 1) from only '
|| quote_ident(nspname) || '.' || quote_ident(relname) || '));' as sql
FROM pg_class c
JOIN pg_namespace n on n.oid = c.relnamespace
JOIN pg_attribute a on a.attrelid = c.oid
JOIN pg_attrdef d on d.adrelid = a.attrelid
and d.adnum = a.attnum
and a.atthasdef
WHERE relkind = 'r' and a.attnum > 0
and pg_get_expr(d.adbin, d.adrelid) ~~ '^nextval'
~@[and c.oid in (select oid from reloids)~]
LOOP
n := n + 1;
EXECUTE r.sql;
END LOOP;
PERFORM pg_notify('seqs', n::text);
END;
$$; " tables)))
(pomo:execute sql))
;; now get the notification signal
(cl-postgres:postgresql-notification (c)
(parse-integer (cl-postgres:postgresql-notification-payload c))))))))
;;;

View File

@ -13,7 +13,6 @@
(schema-name schema)))
(defmethod format-drop-sql ((schema schema) &key (stream nil) cascade)
(declare (ignore pgsql))
(format stream "DROP SCHEMA ~s~@[ CASCADE~];" (schema-name schema) cascade))
@ -186,19 +185,20 @@
(index-filter index)))))))
(defmethod format-drop-sql ((index index) &key (stream nil) cascade)
(declare (ignore cascade))
(let* ((schema-name (schema-name (index-schema index)))
(index-name (index-name index)))
(cond ((index-conname index)
;; here always quote the constraint name, currently the name
;; comes from one source only, the PostgreSQL database catalogs,
;; so don't question it, quote it.
(format stream "ALTER TABLE ~a DROP CONSTRAINT ~s;"
(format stream "ALTER TABLE ~a DROP CONSTRAINT ~s~@[ CASCADE~];"
(format-table-name (index-table index))
(index-conname index)))
(index-conname index)
cascade))
(t
(format stream "DROP INDEX ~@[~a.~]~a;" schema-name index-name)))))
(format stream "DROP INDEX ~@[~a.~]~a~@[ CASCADE~];"
schema-name index-name cascade)))))
;;;
@ -207,7 +207,7 @@
(defmethod format-create-sql ((fk fkey) &key (stream nil) if-not-exists)
(declare (ignore if-not-exists))
(format stream
"ALTER TABLE ~a ADD ~@[CONSTRAINT ~a ~]FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]"
"ALTER TABLE ~a ADD ~@[CONSTRAINT ~s ~]FOREIGN KEY(~{~a~^,~}) REFERENCES ~a(~{~a~^,~})~:[~*~; ON UPDATE ~a~]~:[~*~; ON DELETE ~a~]"
(format-table-name (fkey-table fk))
(fkey-name fk) ; constraint name
(fkey-columns fk)
@ -219,10 +219,10 @@
(fkey-delete-rule fk)))
(defmethod format-drop-sql ((fk fkey) &key (stream nil) cascade)
(declare (ignore cascade))
(let* ((constraint-name (apply-identifier-case (fkey-name fk)))
(let* ((constraint-name (fkey-name fk))
(table-name (format-table-name (fkey-table fk))))
(format stream "ALTER TABLE ~a DROP CONSTRAINT ~a" table-name constraint-name)))
(format stream "ALTER TABLE ~a DROP CONSTRAINT ~s~@[ CASCADE~];"
table-name constraint-name cascade)))
;;;
@ -238,7 +238,6 @@
(trigger-procedure-name trigger)))
(defmethod format-drop-sql ((trigger trigger) &key (stream nil) cascade)
(declare (ignore pgsql))
(format stream
"DROP TRIGGER ~a ON ~a~@[ CASCADE~];"
(trigger-name trigger)
@ -259,7 +258,6 @@
(procedure-body procedure)))
(defmethod format-drop-sql ((procedure procedure) &key (stream nil) cascade)
(declare (ignore pgsql))
(format stream
"DROP FUNCTION ~a()~@[ CASCADE~];" (procedure-name procedure) cascade))

View File

@ -4,33 +4,32 @@
(in-package :pgloader.pgsql)
(defun fetch-pgsql-catalog (target &key table including excluding)
"Fetch PostgreSQL catalogs for the target database."
(let ((catalog (make-catalog :name (db-name target))))
(with-pgsql-connection (target)
(defun fetch-pgsql-catalog (dbname
&key table source-catalog including excluding)
"Fetch PostgreSQL catalogs for the target database. A PostgreSQL
connection must be opened."
(let* ((catalog (make-catalog :name dbname))
(including (cond ((and table (not including))
(make-including-expr-from-table table))
(when (and table (not including))
;; rewrite the table constraint as an including expression
(let ((schema
(or (table-schema table)
(query-table-schema table))))
(setf including
(list (cons (schema-name schema)
(list
(format-table-name-as-including-exp table)))))))
((and catalog (not including))
(make-including-expr-from-catalog source-catalog))
(list-all-columns catalog
:table-type :table
:including including
:excluding excluding)
(t
including))))
(list-all-indexes catalog
:including including
:excluding excluding)
(list-all-fkeys catalog
(list-all-columns catalog
:table-type :table
:including including
:excluding excluding))
:excluding excluding)
(list-all-indexes catalog
:including including
:excluding excluding)
(list-all-fkeys catalog
:including including
:excluding excluding)
(log-message :debug "fetch-pgsql-catalog: ~d tables, ~d indexes, ~d fkeys"
(count-tables catalog)
@ -45,18 +44,60 @@
catalog))
(defun make-including-expr-from-catalog (catalog)
"Return an expression suitable to be used as an :including parameter."
(let (including current-schema)
;; The schema where to install the table or view in the target database
;; might be different from the schema where we find it in the source
;; table, thanks to the ALTER TABLE ... SET SCHEMA ... feature of
;; pgloader.
;;
;; The schema we want to lookup here is the target schema, so it's
;; (table-schema table) and not the schema where we found the table in
;; the catalog nested structure.
;;
;; Also, MySQL schema map to PostgreSQL databases, so we might have NIL
;; as a schema name here. In that case, we find the current PostgreSQL
;; schema and use that.
(loop :for table :in (append (table-list catalog)
(view-list catalog))
:do (let* ((schema-name
(or (schema-name (table-schema table))
current-schema
(setf current-schema
(pomo:query "select current_schema()" :single))))
(table-expr
(format-table-name-as-including-exp table))
(schema-entry
(or (assoc schema-name including :test #'string=)
(progn (push (cons schema-name nil) including)
(assoc schema-name including :test #'string=)))))
(push-to-end table-expr (cdr schema-entry))))
;; return the including alist
including))
(defun make-including-expr-from-table (table)
"Return an expression suitable to be used as an :including parameter."
(let ((schema (or (table-schema table)
(query-table-schema table))))
(list (cons (schema-name schema)
(list
(format-table-name-as-including-exp table))))))
(defun format-table-name-as-including-exp (table)
"Return a table name suitable for a catalog lookup using ~ operator."
(let ((table-name (table-name table)))
(format nil "^~a$"
(cond ((pgloader.quoting::quoted-p table-name)
;; when the table name comes from the user (e.g. in the
;; load file) then we might have to unquote it: the
;; PostgreSQL catalogs does not store object names in
;; their quoted form.
(subseq table-name 1 (1- (length table-name))))
(format nil "^~a$" (ensure-unquoted table-name))))
(t table-name)))))
(defun ensure-unquoted (identifier)
(cond ((pgloader.quoting::quoted-p identifier)
;; when the table name comes from the user (e.g. in the
;; load file) then we might have to unquote it: the
;; PostgreSQL catalogs does not store object names in
;; their quoted form.
(subseq identifier 1 (1- (length identifier))))
(t identifier)))
(defun query-table-schema (table)
"Get PostgreSQL schema name where to locate TABLE-NAME by following the
@ -161,6 +202,9 @@
join pg_namespace n ON n.oid = i.relnamespace
join pg_namespace rn ON rn.oid = r.relnamespace
left join pg_constraint c ON c.conindid = i.oid
and c.conrelid = r.oid
-- filter out self-fkeys
and c.confrelid <> r.oid
where n.nspname !~~ '^pg_' and n.nspname <> 'information_schema'
~:[~*~;and (~{~a~^~&~10t or ~})~]
~:[~*~;and (~{~a~^~&~10t and ~})~]
@ -214,7 +258,7 @@ order by n.nspname, r.relname"
from pg_catalog.pg_constraint r
JOIN pg_class c on r.conrelid = c.oid
JOIN pg_namespace n on c.relnamespace = n.oid
JOIN pg_class cf on r.confrelid = c.oid
JOIN pg_class cf on r.confrelid = cf.oid
JOIN pg_namespace nf on cf.relnamespace = nf.oid
where r.contype = 'f'
AND c.relkind = 'r' and cf.relkind = 'r'
@ -261,7 +305,7 @@ order by n.nspname, r.relname"
(fschema (find-schema catalog fschema-name))
(ftable (find-table fschema ftable-name))
(fk
(make-fkey :name (apply-identifier-case conname)
(make-fkey :name conname
:condef condef
:table table
:columns (split-sequence:split-sequence #\, cols)
@ -279,3 +323,24 @@ order by n.nspname, r.relname"
:finally (return catalog)))
;;;
;;; Extra utilities to introspect a PostgreSQL schema.
;;;
(defun list-schemas ()
"Return the list of PostgreSQL schemas in the already established
PostgreSQL connection."
(pomo:query "SELECT nspname FROM pg_catalog.pg_namespace;" :column))
(defun list-table-oids (table-names)
"Return an hash table mapping TABLE-NAME to its OID for all table in the
TABLE-NAMES list. A PostgreSQL connection must be established already."
(let ((oidmap (make-hash-table :size (length table-names) :test #'equal)))
(when table-names
(loop :for (name oid)
:in (pomo:query
(format nil
"select n, n::regclass::oid from (values ~{('~a')~^,~}) as t(n)"
table-names))
:do (setf (gethash name oidmap) oid)))
oidmap))

View File

@ -1,137 +0,0 @@
;;;
;;; Tools to handle PostgreSQL queries
;;;
(in-package :pgloader.pgsql)
(defun list-schemas ()
"Return the list of PostgreSQL schemas in the already established
PostgreSQL connection."
(pomo:query "SELECT nspname FROM pg_catalog.pg_namespace;" :column))
(defun list-tables-and-fkeys (&optional schema-name)
"Yet another table listing query."
(loop :for (relname fkeys) :in (pomo:query (format nil "
select relname, array_to_string(array_agg(conname), ',')
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_constraint co on c.oid = co.conrelid
where contype = 'f' and nspname = ~:[current_schema()~;'~a'~]
group by relname;" schema-name schema-name))
:collect (cons relname (sq:split-sequence #\, fkeys))))
(defun list-columns-query (table-name &optional schema)
"Returns the list of columns for table TABLE-NAME in schema SCHEMA, and
must be run with an already established PostgreSQL connection."
(pomo:query (format nil "
select attname, t.oid::regtype
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.oid = '~:[~*~a~;~a.~a~]'::regclass and attnum > 0
order by attnum" schema schema table-name)))
(defun list-columns (pgconn table-name &key schema)
"Return a list of column names for given TABLE-NAME."
(with-pgsql-transaction (:pgconn pgconn)
(with-schema (unqualified-table-name table-name)
(loop :for (name type)
:in (list-columns-query unqualified-table-name schema)
:collect name))))
(defun list-indexes (table)
"List all indexes for TABLE-NAME in SCHEMA. A PostgreSQL connection must
be already established when calling that function."
(loop
:with sql-index-list
:= (let ((sql (format nil "
select i.relname,
n.nspname,
indrelid::regclass,
indrelid,
indisprimary,
indisunique,
pg_get_indexdef(indexrelid),
c.conname,
pg_get_constraintdef(c.oid)
from pg_index x
join pg_class i ON i.oid = x.indexrelid
join pg_namespace n ON n.oid = i.relnamespace
left join pg_constraint c ON c.conindid = i.oid
where indrelid = '~a'::regclass"
(format-table-name table))))
(log-message :debug "~a" sql)
sql)
:for (name schema table-name table-oid primary unique sql conname condef)
:in (pomo:query sql-index-list)
:collect (make-pgsql-index :name name
:schema schema
:table-oid table-oid
:primary primary
:unique unique
:columns nil
:sql sql
:conname (unless (eq :null conname) conname)
:condef (unless (eq :null condef) condef))))
(defun reset-all-sequences (pgconn &key tables)
"Reset all sequences to the max value of the column they are attached to."
(let ((newconn (clone-connection pgconn)))
(with-pgsql-connection (newconn)
(set-session-gucs *pg-settings*)
(pomo:execute "set client_min_messages to warning;")
(pomo:execute "listen seqs")
(when tables
(pomo:execute
(format nil "create temp table reloids(oid) as values ~{('~a'::regclass)~^,~}"
(mapcar #'format-table-name tables))))
(handler-case
(let ((sql (format nil "
DO $$
DECLARE
n integer := 0;
r record;
BEGIN
FOR r in
SELECT 'select '
|| trim(trailing ')'
from replace(pg_get_expr(d.adbin, d.adrelid),
'nextval', 'setval'))
|| ', (select greatest(max(' || quote_ident(a.attname) || '), 1) from only '
|| quote_ident(nspname) || '.' || quote_ident(relname) || '));' as sql
FROM pg_class c
JOIN pg_namespace n on n.oid = c.relnamespace
JOIN pg_attribute a on a.attrelid = c.oid
JOIN pg_attrdef d on d.adrelid = a.attrelid
and d.adnum = a.attnum
and a.atthasdef
WHERE relkind = 'r' and a.attnum > 0
and pg_get_expr(d.adbin, d.adrelid) ~~ '^nextval'
~@[and c.oid in (select oid from reloids)~]
LOOP
n := n + 1;
EXECUTE r.sql;
END LOOP;
PERFORM pg_notify('seqs', n::text);
END;
$$; " tables)))
(pomo:execute sql))
;; now get the notification signal
(cl-postgres:postgresql-notification (c)
(parse-integer (cl-postgres:postgresql-notification-payload c)))))))
(defun list-table-oids (table-names)
"Return an alist of (TABLE-NAME . TABLE-OID) for all table in the
TABLE-NAMES list. A connection must be established already."
(when table-names
(loop for (name oid)
in (pomo:query
(format nil
"select n, n::regclass::oid from (values ~{('~a')~^,~}) as t(n)"
table-names))
collect (cons name oid))))

122
src/regress/regress.lisp Normal file
View File

@ -0,0 +1,122 @@
;;;
;;; Regression tests driver.
;;;
;;; We're using SQL EXCEPT to compare what we loaded with what we expected
;;; to load.
;;;
(in-package #:pgloader)
(defun process-regression-test (load-file &key start-logger)
"Run a regression test for given LOAD-FILE."
(unless (probe-file load-file)
(format t "Regression testing ~s: file does not exists." load-file)
(uiop:quit +os-code-error-regress+))
;; now do our work
(with-monitor (:start-logger start-logger)
(log-message :log "Regression testing: ~s" load-file)
(process-command-file load-file)
;; once we are done running the load-file, compare the loaded data with
;; our expected data file
(bind ((expected-subdir (directory-namestring
(asdf:system-relative-pathname
:pgloader "test/regress/expected/")))
(expected-data-file (make-pathname :defaults load-file
:type "out"
:directory expected-subdir))
((target-conn gucs) (parse-target-pg-db-uri load-file))
(*pg-settings* (pgloader.pgsql:sanitize-user-gucs gucs))
(*pgsql-reserved-keywords* (list-reserved-keywords target-conn))
(target-table (create-table (pgconn-table-name target-conn)))
(expected-data-source
(parse-source-string-for-type
:copy (uiop:native-namestring expected-data-file)))
;; change target table-name schema
(expected-data-target
(let ((e-d-t (clone-connection target-conn)))
(setf (pgconn-table-name e-d-t)
;;
;; The connection facility still works with cons here,
;; rather than table structure instances, because of
;; depedencies as explained in
;; src/parsers/command-db-uri.lisp
;;
(cons "expected" (table-name target-table)))
e-d-t)))
(log-message :log "Comparing loaded data against ~s" expected-data-file)
;; prepare expected table in "expected" schema
(with-pgsql-connection (target-conn)
(with-schema (unqualified-table-name target-table)
(let* ((tname (apply-identifier-case unqualified-table-name))
(drop (format nil "drop table if exists expected.~a;"
tname))
(create (format nil "create table expected.~a(like ~a);"
tname tname)))
(log-message :notice "~a" drop)
(pomo:query drop)
(log-message :notice "~a" create)
(pomo:query create))))
;; load expected data
(load-data :from expected-data-source
:into expected-data-target
:options '(:truncate t)
:start-logger nil)
;; now compare both
(with-pgsql-connection (target-conn)
(with-schema (unqualified-table-name target-table)
(let* ((tname (apply-identifier-case unqualified-table-name))
(cols (loop :for (name type)
:in (list-columns tname)
;;
;; We can't just use table names here, because
;; PostgreSQL support for the POINT datatype fails
;; to implement EXCEPT support, and the query then
;; fails with:
;;
;; could not identify an equality operator for type point
;;
:collect (if (string= "point" type)
(format nil "~s::text" name)
(format nil "~s" name))))
(sql (format nil
"select count(*) from (select ~{~a~^, ~} from expected.~a except select ~{~a~^, ~} from ~a) ss"
cols
tname
cols
tname))
(diff-count (pomo:query sql :single)))
(log-message :notice "~a" sql)
(log-message :notice "Got a diff of ~a rows" diff-count)
(if (= 0 diff-count)
(progn
(log-message :log "Regress pass.")
#-pgloader-image (values diff-count +os-code-success+)
#+pgloader-image (uiop:quit +os-code-success+))
(progn
(log-message :log "Regress fail.")
#-pgloader-image (values diff-count +os-code-error-regress+)
#+pgloader-image (uiop:quit +os-code-error-regress+)))))))))
;;;
;;; TODO: use the catalogs structures and introspection facilities.
;;;
(defun list-columns (table-name &optional schema)
"Returns the list of columns for table TABLE-NAME in schema SCHEMA, and
must be run with an already established PostgreSQL connection."
(pomo:query (format nil "
select attname, t.oid::regtype
from pg_class c
join pg_namespace n on n.oid = c.relnamespace
left join pg_attribute a on c.oid = a.attrelid
join pg_type t on t.oid = a.atttypid
where c.oid = '~:[~*~a~;~a.~a~]'::regclass and attnum > 0
order by attnum" schema schema table-name)))

View File

@ -58,7 +58,7 @@
(:documentation
"Return the list of column names for the data sent in the queue."))
(defgeneric copy-from (source &key truncate)
(defgeneric copy-from (source &key)
(:documentation
"Load data from SOURCE into its target as defined by the SOURCE object."))

View File

@ -11,6 +11,8 @@
(defmethod prepare-pgsql-database ((copy db-copy)
(catalog catalog)
&key
truncate
create-tables
create-schemas
set-table-oids
materialize-views
@ -25,34 +27,58 @@
(with-stats-collection ("create, drop" :use-result-as-rows t :section :pre)
(with-pgsql-transaction (:pgconn (target-db copy))
;; we need to first drop the Foreign Key Constraints, so that we
;; can DROP TABLE when asked
;; (when (and foreign-keys include-drop)
;; (drop-pgsql-fkeys catalog))
(when create-schemas
(log-message :debug "Create schemas")
(log-message :notice "Create schemas")
(create-schemas catalog :include-drop include-drop))
;; create new SQL types (ENUMs, SETs) if needed and before we get to
;; the table definitions that will use them
(log-message :debug "Create SQL types (enums, sets)")
(create-sqltypes catalog :include-drop include-drop)
(if create-tables
(progn
;; create new SQL types (ENUMs, SETs) if needed and before we
;; get to the table definitions that will use them
(log-message :notice "Create SQL types (enums, sets)")
(create-sqltypes catalog
:include-drop include-drop
:client-min-messages :error)
;; now the tables
(log-message :debug "Create tables")
(create-tables catalog :include-drop include-drop)
;; now the tables
(log-message :notice "Create tables")
(create-tables catalog
:include-drop include-drop
:client-min-messages :error))
(progn
;; if we're not going to create the tables, now is the time to
;; remove the constraints: indexes, primary keys, foreign keys
;;
;; to be able to do that properly, get the constraints from
;; the pre-existing target database catalog
(let ((pgsql-catalog
(fetch-pgsql-catalog (db-name (target-db copy))
:source-catalog catalog)))
(merge-catalogs catalog pgsql-catalog))
;; now the foreign keys and only then the indexes, because a
;; drop constraint on a primary key cascades to the drop of
;; any foreign key that targets the primary key
(when foreign-keys
(drop-pgsql-fkeys catalog))
(loop :for table :in (table-list catalog)
:do (drop-indexes :pre table))
(when truncate
(truncate-tables catalog))))
;; Some database sources allow the same index name being used
;; against several tables, so we add the PostgreSQL table OID in the
;; index name, to differenciate. Set the table oids now.
(when set-table-oids
(log-message :debug "Set table OIDs")
(pgloader.pgsql::set-table-oids catalog))
(when (and create-tables set-table-oids)
(log-message :notice "Set table OIDs")
(set-table-oids catalog))
;; We might have to MATERIALIZE VIEWS
(when materialize-views
(log-message :debug "Create tables for matview support")
(log-message :notice "Create tables for matview support")
(create-views catalog :include-drop include-drop)))))
(defmethod cleanup ((copy db-copy) (catalog catalog) &key materialize-views)
@ -67,6 +93,7 @@
&key
data-only
foreign-keys
create-triggers
reset-sequences)
"After loading the data into PostgreSQL, we can now reset the sequences
and declare foreign keys."
@ -77,32 +104,31 @@
;; while CREATE INDEX statements are in flight (avoid locking).
;;
(when reset-sequences
(reset-sequences catalog :pgconn (clone-connection (target-db copy))))
(reset-sequences (clone-connection (target-db copy)) catalog))
(with-pgsql-connection ((clone-connection (target-db copy)))
;;
;; Turn UNIQUE indexes into PRIMARY KEYS now
;;
(unless data-only
(loop :for sql :in pkeys
:when sql
:do (pgsql-execute-with-timing :post "Primary Keys" sql)))
(pgsql-execute-with-timing :post "Primary Keys" pkeys)
;;
;; Foreign Key Constraints
;;
;; We need to have finished loading both the reference and the refering
;; tables to be able to build the foreign keys, so wait until all tables
;; and indexes are imported before doing that.
;;
(when (and foreign-keys (not data-only))
(create-pgsql-fkeys catalog))
;;
;; Foreign Key Constraints
;;
;; We need to have finished loading both the reference and the refering
;; tables to be able to build the foreign keys, so wait until all tables
;; and indexes are imported before doing that.
;;
(when foreign-keys
(create-pgsql-fkeys catalog))
;;
;; Triggers and stored procedures -- includes special default values
;;
(with-pgsql-transaction (:pgconn (target-db copy))
(create-triggers catalog))
;;
;; Triggers and stored procedures -- includes special default values
;;
(when create-triggers
(with-pgsql-transaction (:pgconn (target-db copy))
(create-triggers catalog))))
;;
;; And now, comments on tables and columns.
@ -205,17 +231,21 @@
;; if asked, first drop/create the tables on the PostgreSQL side
(handler-case
(cond ((and (or create-tables schema-only) (not data-only))
(prepare-pgsql-database copy
catalog
:set-table-oids set-table-oids
:materialize-views materialize-views
:create-schemas create-schemas
:foreign-keys foreign-keys
:include-drop include-drop))
(truncate
(truncate-tables (target-db copy) catalog)))
(prepare-pgsql-database copy
catalog
:truncate (and truncate (not create-tables))
:create-tables (and create-tables
(or schema-only
(not data-only)))
:create-schemas (and create-schemas
(or schema-only
(not data-only)))
:include-drop include-drop
:foreign-keys (and foreign-keys
(or schema-only
(not data-only)))
:set-table-oids set-table-oids
:materialize-views materialize-views)
;;
;; In case some error happens in the preparatory transaction, we
;; need to stop now and refrain from trying to load the data into
@ -282,7 +312,11 @@
(not data-only)
(zerop (gethash table writers-count)))
(let* ((*preserve-index-names*
(eq :preserve index-names)))
(or (eq :preserve index-names)
;; if we didn't create the tables, we
;; are re-installing the pre-existing
;; indexes
(not create-tables))))
(alexandria:appendf
pkeys
(create-indexes-in-kernel (target-db copy)
@ -313,6 +347,11 @@
pkeys
:data-only data-only
:foreign-keys foreign-keys
;; only create triggers (for default values)
;; when we've been responsible for creating the
;; tables -- otherwise assume the schema is
;; good as it is
:create-triggers create-tables
:reset-sequences reset-sequences)
;;

View File

@ -64,7 +64,8 @@
:target (target copy)
:fields (fields copy)
:columns (columns copy)
:transforms (transforms copy)
:transforms (or (transforms copy)
(make-list (length (columns copy))))
:encoding (encoding copy)
:skip-lines (skip-lines copy)
:header (header copy)))
@ -99,22 +100,38 @@
create-indexes reset-sequences materialize-views
set-table-oids including excluding))
(let* ((pgsql-catalog
(handler-case
(fetch-pgsql-catalog (target-db copy) :table (target copy))
(condition (e)
(log-message :fatal "Failed to fetch target PostgreSQL catalogs.")
(log-message :fatal "~a" e)
(return-from copy-database)))))
(let* ((pgconn (target-db copy))
pgsql-catalog)
;; this sets (table-index-list (target copy))
(maybe-drop-indexes (target-db copy)
pgsql-catalog
:drop-indexes drop-indexes)
(handler-case
(with-pgsql-connection (pgconn)
(setf pgsql-catalog
(fetch-pgsql-catalog (db-name pgconn) :table (target copy)))
;; ensure we truncate only once, do it before going parallel
(when truncate
(truncate-tables (target-db copy) pgsql-catalog))
;; if the user didn't tell us the column list of the table, now is
;; a proper time to set it in the copy object
(unless (and (slot-boundp copy 'columns)
(slot-value copy 'columns))
(setf (columns copy)
(mapcar (lambda (col)
;; we need to handle the md-copy format for the
;; column list, which allow for user given
;; options: each column is a list which car is
;; the column name.
(list (column-name col)))
(table-field-list (first (table-list pgsql-catalog))))))
;; this sets (table-index-list (target copy))
(maybe-drop-indexes pgsql-catalog :drop-indexes drop-indexes)
;; now is the proper time to truncate, before parallel operations
(when truncate
(truncate-tables pgsql-catalog)))
(condition (e)
(log-message :fatal "Failed to prepare target PostgreSQL table.")
(log-message :fatal "~a" e)
(return-from copy-database)))
;; expand the specs of our source, we might have to care about several
;; files actually.
@ -128,7 +145,6 @@
:kernel lp:*kernel*
:channel channel
:on-error-stop on-error-stop
:truncate nil
:disable-triggers disable-triggers)))
;; end kernel

View File

@ -124,7 +124,6 @@
(channel nil c-s-p)
(worker-count 8)
(concurrency 2)
truncate
(on-error-stop *on-error-stop*)
disable-triggers)
"Copy data from COPY source into PostgreSQL.
@ -179,12 +178,6 @@
:for fmtq :in fmtqs
:do (lp:submit-task channel #'format-data-to-copy copy rawq fmtq))
;; Also, we need to do the TRUNCATE here before starting the
;; threads, so that it's done just once.
(when truncate
(truncate-tables (clone-connection (target-db copy))
(target copy)))
(loop :for fmtq :in fmtqs
:do (lp:submit-task channel
#'pgloader.pgsql:copy-from-queue

View File

@ -18,18 +18,6 @@
:initform "\\N"))
(:documentation "pgloader COPY Data Source"))
(defmethod initialize-instance :after ((copy copy-copy) &key)
"Compute the real source definition from the given source parameter, and
set the transforms function list as needed too."
(let ((transforms (when (slot-boundp copy 'transforms)
(slot-value copy 'transforms)))
(columns
(or (slot-value copy 'columns)
(pgloader.pgsql:list-columns (slot-value copy 'target-db)
(slot-value copy 'target)))))
(unless transforms
(setf (slot-value copy 'transforms) (make-list (length columns))))))
(defmethod clone-copy-for ((copy copy-copy) path-spec)
"Create a copy of FIXED for loading data from PATH-SPEC."
(let ((copy-for-path-spec

View File

@ -36,18 +36,6 @@
:initform t))
(:documentation "pgloader CSV Data Source"))
(defmethod initialize-instance :after ((csv copy-csv) &key)
"Compute the real source definition from the given source parameter, and
set the transforms function list as needed too."
(let ((transforms (when (slot-boundp csv 'transforms)
(slot-value csv 'transforms)))
(columns
(or (slot-value csv 'columns)
(pgloader.pgsql:list-columns (slot-value csv 'target-db)
(slot-value csv 'target)))))
(unless transforms
(setf (slot-value csv 'transforms) (make-list (length columns))))))
(defmethod clone-copy-for ((csv copy-csv) path-spec)
"Create a copy of CSV for loading data from PATH-SPEC."
(let ((csv-for-path-spec

View File

@ -18,18 +18,6 @@
:initform 0))
(:documentation "pgloader Fixed Columns Data Source"))
(defmethod initialize-instance :after ((fixed copy-fixed) &key)
"Compute the real source definition from the given source parameter, and
set the transforms function list as needed too."
(let ((transforms (when (slot-boundp fixed 'transforms)
(slot-value fixed 'transforms)))
(columns
(or (slot-value fixed 'columns)
(pgloader.pgsql:list-columns (slot-value fixed 'target-db)
(slot-value fixed 'target)))))
(unless transforms
(setf (slot-value fixed 'transforms) (make-list (length columns))))))
(defmethod clone-copy-for ((fixed copy-fixed) path-spec)
"Create a copy of FIXED for loading data from PATH-SPEC."
(let ((fixed-clone

View File

@ -10,6 +10,11 @@ load database
WITH concurrency = 1, workers = 6,
max parallel create index = 4
-- uncomment the following line to test loading into an already
-- existing schema, and make sure the schema actually is ready by
-- having done a first migration without those options:
--, create no tables, include drop, truncate
SET maintenance_work_mem to '128MB',
work_mem to '12MB',
search_path to 'sakila, public, "$user"'