;;; ;;; Generic API for pgloader sources ;;; Methods for database source types (with introspection) ;;; (in-package :pgloader.load) ;;; ;;; Prepare the PostgreSQL database before streaming the data into it. ;;; (defmethod prepare-pgsql-database ((copy db-copy) (catalog catalog) &key truncate create-tables create-schemas drop-schema drop-indexes set-table-oids materialize-views foreign-keys include-drop distribute) "Prepare the target PostgreSQL database: create tables casting datatypes from the MySQL definitions, prepare index definitions and create target tables for materialized views. That function mutates index definitions in ALL-INDEXES." (log-message :notice "Prepare PostgreSQL database.") (with-pgsql-transaction (:pgconn (target-db copy)) (finalize-catalogs catalog (pgconn-variant (target-db copy))) (if create-tables (progn (when create-schemas (with-stats-collection ("Create Schemas" :section :pre :use-result-as-read t :use-result-as-rows t) (create-schemas catalog :include-drop drop-schema :client-min-messages :error))) ;; create new SQL types (ENUMs, SETs) if needed and before we ;; get to the table definitions that will use them (with-stats-collection ("Create SQL Types" :section :pre :use-result-as-read t :use-result-as-rows t) ;; some SQL types come from extensions (ip4r, hstore, etc) (create-extensions catalog :include-drop include-drop :if-not-exists t :client-min-messages :error) (create-sqltypes catalog :include-drop include-drop :client-min-messages :error)) ;; now the tables (with-stats-collection ("Create tables" :section :pre :use-result-as-read t :use-result-as-rows t) (create-tables catalog :include-drop include-drop :client-min-messages :error))) (progn ;; if we're not going to create the tables, now is the time to ;; remove the constraints: indexes, primary keys, foreign keys ;; ;; to be able to do that properly, get the constraints from ;; the pre-existing target database catalog (let ((pgsql-catalog (fetch-pgsql-catalog (db-name (target-db copy)) :source-catalog catalog))) (merge-catalogs catalog pgsql-catalog)) ;; now the foreign keys and only then the indexes, because a ;; drop constraint on a primary key cascades to the drop of ;; any foreign key that targets the primary key (when foreign-keys (with-stats-collection ("Drop Foreign Keys" :section :pre :use-result-as-read t :use-result-as-rows t) (drop-pgsql-fkeys catalog :log-level :notice))) (when drop-indexes (with-stats-collection ("Drop Indexes" :section :pre :use-result-as-read t :use-result-as-rows t) ;; we want to error out early in case we can't DROP the ;; index, don't CASCADE (drop-indexes catalog :cascade nil :log-level :notice))) (when truncate (with-stats-collection ("Truncate" :section :pre :use-result-as-read t :use-result-as-rows t) (truncate-tables catalog))))) ;; Some database sources allow the same index name being used ;; against several tables, so we add the PostgreSQL table OID in the ;; index name, to differenciate. Set the table oids now. (when (and create-tables set-table-oids) (with-stats-collection ("Set Table OIDs" :section :pre :use-result-as-read t :use-result-as-rows t) (set-table-oids catalog :variant (pgconn-variant (target-db copy))))) ;; We might have to MATERIALIZE VIEWS (when (and create-tables materialize-views) (with-stats-collection ("Create MatViews Tables" :section :pre :use-result-as-read t :use-result-as-rows t) (create-views catalog :include-drop include-drop :client-min-messages :error))) ;; Citus Support (when distribute (with-stats-collection ("Citus Distribute Tables" :section :pre) (let ((citus-sql (loop :for rule :in distribute :collect (format-create-sql rule)))) (pgsql-execute citus-sql :client-min-messages :notice))))) ;; log the catalog we just fetched and (maybe) merged (log-message :data "CATALOG: ~s" catalog)) (defmethod complete-pgsql-database ((copy db-copy) (catalog catalog) pkeys &key foreign-keys create-indexes create-triggers reset-sequences) "After loading the data into PostgreSQL, we can now reset the sequences and declare foreign keys." ;; ;; Now Reset Sequences, the good time to do that is once the whole data ;; has been imported and once we have the indexes in place, as max() is ;; able to benefit from the indexes. In particular avoid doing that step ;; while CREATE INDEX statements are in flight (avoid locking). ;; (log-message :notice "Completing PostgreSQL database.") (when reset-sequences (reset-sequences (clone-connection (target-db copy)) catalog)) (handler-case (with-pgsql-transaction (:pgconn (clone-connection (target-db copy))) ;; ;; Turn UNIQUE indexes into PRIMARY KEYS now ;; (when create-indexes (pgsql-execute-with-timing :post "Primary Keys" pkeys :log-level :notice)) ;; ;; Foreign Key Constraints ;; ;; We need to have finished loading both the reference and the ;; refering tables to be able to build the foreign keys, so wait ;; until all tables and indexes are imported before doing that. ;; (when foreign-keys (create-pgsql-fkeys catalog :section :post :label "Create Foreign Keys" :log-level :notice)) ;; ;; Triggers and stored procedures -- includes special default values ;; (when create-triggers (create-triggers catalog :section :post :label "Create Triggers")) ;; ;; Add schemas that needs to be in the search_path to the database ;; search_path, when using PostgreSQL. Redshift doesn't know how to ;; do that, unfortunately. ;; (unless (eq :redshift (pgconn-variant (target-db copy))) (add-to-search-path catalog :section :post :label "Set Search Path")) ;; ;; And now, comments on tables and columns. ;; (comment-on-tables-and-columns catalog :section :post :label "Install Comments")) (postgresql-unavailable (condition) (log-message :error "~a" condition) (log-message :error "Complete PostgreSQL database reconnecting to PostgreSQL.") ;; in order to avoid Socket error in "connect": ECONNREFUSED if we ;; try just too soon, wait a little (sleep 2) ;; ;; Reset Sequence can be done several times safely, and the rest of the ;; operations run in a single transaction, so if the connection was lost, ;; nothing has been done. Retry. ;; (complete-pgsql-database copy catalog pkeys :foreign-keys foreign-keys :create-indexes create-indexes :create-triggers create-triggers :reset-sequences reset-sequences)))) (defun process-catalog (copy catalog &key alter-table alter-schema distribute) "Do all the PostgreSQL catalog tweaking here: casts, index WHERE clause rewriting, pgloader level alter schema and alter table commands." ;; cast the catalog into something PostgreSQL can work on (cast catalog) ;; support code for index filters (where clauses) (process-index-definitions catalog :sql-dialect (class-name (class-of copy))) ;; we may have to alter schemas (when alter-schema (alter-schema catalog alter-schema)) ;; if asked, now alter the catalog with given rules: the alter-table ;; keyword parameter actually contains a set of alter table rules. (when alter-table (alter-table catalog alter-table)) ;; we also support schema changes necessary for Citus distribution (when distribute (pgloader.catalog::citus-distribute-schema catalog distribute))) ;;; ;;; Generic enough implementation of the copy-database method. ;;; (defmethod copy-database ((copy db-copy) &key (on-error-stop *on-error-stop*) (worker-count 4) (concurrency 1) (multiple-readers nil) max-parallel-create-index (truncate nil) (disable-triggers nil) (data-only nil) (schema-only nil) (create-schemas t) (create-tables t) (include-drop t) (drop-schema nil) (create-indexes t) (index-names :uniquify) (reset-sequences t) (foreign-keys t) (reindex nil) (after-schema nil) distribute only-tables including excluding set-table-oids alter-table alter-schema materialize-views) "Export database source data and Import it into PostgreSQL" (log-message :log "Migrating from ~a" (source-db copy)) (log-message :log "Migrating into ~a" (target-db copy)) (let* ((*on-error-stop* on-error-stop) (copy-data (or data-only (not schema-only))) (create-ddl (or schema-only (not data-only))) (create-tables (and create-tables create-ddl)) (create-schemas (and create-schemas create-ddl)) ;; foreign keys has a special meaning in data-only mode (foreign-keys (if (eq :redshift (pgconn-variant (target-db copy))) nil foreign-keys)) (drop-indexes (if (eq :redshift (pgconn-variant (target-db copy))) nil (or reindex (and include-drop create-ddl)))) (create-indexes (if (eq :redshift (pgconn-variant (target-db copy))) nil (or reindex (and create-indexes drop-indexes create-ddl)))) (reset-sequences (if (eq :redshift (pgconn-variant (target-db copy))) nil reset-sequences)) (*preserve-index-names* (or (eq :preserve index-names) ;; if we didn't create the tables, we are re-installing the ;; pre-existing indexes (not create-tables))) (copy-kernel (make-kernel worker-count)) (copy-channel (let ((lp:*kernel* copy-kernel)) (lp:make-channel))) (catalog (fetch-metadata copy (make-catalog :name (typecase (source-db copy) (db-connection (db-name (source-db copy))) (fd-connection (pathname-name (fd-path (source-db copy)))))) :materialize-views materialize-views :create-indexes create-indexes :foreign-keys foreign-keys :only-tables only-tables :including including :excluding excluding)) pkeys (writers-count (make-hash-table :size (count-tables catalog))) (max-indexes (when create-indexes (max-indexes-per-table catalog))) (idx-kernel (when (and max-indexes (< 0 max-indexes)) (make-kernel (or max-parallel-create-index max-indexes)))) (idx-channel (when idx-kernel (let ((lp:*kernel* idx-kernel)) (lp:make-channel)))) (task-count 0)) ;; apply catalog level transformations to support the database migration ;; that's CAST rules, index WHERE clause rewriting and ALTER commands (process-catalog copy catalog :alter-table alter-table :alter-schema alter-schema :distribute distribute) ;; if asked, first drop/create the tables on the PostgreSQL side (handler-case (progn (prepare-pgsql-database copy catalog :truncate truncate :create-tables create-tables :create-schemas create-schemas :drop-indexes drop-indexes :drop-schema drop-schema :include-drop include-drop :foreign-keys foreign-keys :set-table-oids set-table-oids :materialize-views materialize-views :distribute distribute) ;; if there's an AFTER SCHEMA DO/EXECUTE command, now is the time ;; to run it. (when after-schema (pgloader.parser::execute-sql-code-block (target-db copy) :pre after-schema "after schema"))) ;; ;; In case some error happens in the preparatory transaction, we ;; need to stop now and refrain from trying to load the data into ;; an incomplete schema. ;; (cl-postgres:database-error (e) (declare (ignore e)) ; a log has already been printed (log-message :fatal "Failed to create the schema, see above.") ;; we might have some cleanup to do... (cleanup copy catalog :materialize-views materialize-views) (return-from copy-database))) (loop :for table :in (append (table-list catalog) ;; when materialized views are not supported, ;; view-list is empty here (view-list catalog)) :do (let ((table-source (instanciate-table-copy-object copy table))) ;; first COPY the data from source to PostgreSQL, using copy-kernel (if (not copy-data) ;; start indexing straight away then (when create-indexes (alexandria:appendf pkeys (create-indexes-in-kernel (target-db copy) table idx-kernel idx-channel))) ;; prepare the writers-count hash-table, as we start ;; copy-from, we have concurrency tasks writing. (progn ; when copy-data (setf (gethash table writers-count) concurrency) (incf task-count (copy-from table-source :concurrency concurrency :multiple-readers multiple-readers :kernel copy-kernel :channel copy-channel :on-error-stop on-error-stop :disable-triggers disable-triggers)))))) ;; now end the kernels ;; and each time a table is done, launch its indexing (when copy-data (let ((lp:*kernel* copy-kernel)) (with-stats-collection ("COPY Threads Completion" :section :post :use-result-as-read t :use-result-as-rows t) (loop :repeat task-count :do (destructuring-bind (task table seconds) (lp:receive-result copy-channel) (log-message :debug "Finished processing ~a for ~s ~50T~6$s" task (format-table-name table) seconds) (when (eq :writer task) ;; ;; Start the CREATE INDEX parallel tasks only when ;; the data has been fully copied over to the ;; corresponding table, that's when the writers ;; count is down to zero. ;; (decf (gethash table writers-count)) (log-message :debug "writers-counts[~a] = ~a" (format-table-name table) (gethash table writers-count)) (when (and create-indexes (zerop (gethash table writers-count))) (log-message :notice "DONE copying ~a" (format-table-name table)) (alexandria:appendf pkeys (create-indexes-in-kernel (target-db copy) table idx-kernel idx-channel))))) :finally (progn (lp:end-kernel :wait nil) (return worker-count)))))) (log-message :info "Done with COPYing data, waiting for indexes") (when create-indexes (let ((lp:*kernel* idx-kernel)) ;; wait until the indexes are done being built... ;; don't forget accounting for that waiting time. (with-stats-collection ("Index Build Completion" :section :post :use-result-as-read t :use-result-as-rows t) (loop :for count :below (count-indexes catalog) :do (lp:receive-result idx-channel)) (lp:end-kernel :wait t) (log-message :info "Done waiting for indexes") (count-indexes catalog)))) ;; ;; Complete the PostgreSQL database before handing over. ;; (complete-pgsql-database copy catalog pkeys :foreign-keys foreign-keys :create-indexes create-indexes ;; only create triggers (for default values) ;; when we've been responsible for creating the ;; tables -- otherwise assume the schema is ;; good as it is :create-triggers create-tables :reset-sequences reset-sequences) ;; ;; Time to cleanup! ;; (cleanup copy catalog :materialize-views materialize-views)))