mirror of
https://github.com/dimitri/pgloader.git
synced 2026-05-04 10:31:02 +02:00
Refactor code from previous commit.
The goal is to make it easy to add support for the 'drop indexes' option in other source types (fixed, ixf, db3, file-based sources).
This commit is contained in:
parent
49bf7e56f2
commit
4f2385fa4c
@ -201,6 +201,8 @@
|
||||
#:create-indexes-in-kernel
|
||||
#:set-table-oids
|
||||
#:drop-indexes
|
||||
#:maybe-drop-indexes
|
||||
#:create-indexes-again
|
||||
#:reset-sequences))
|
||||
|
||||
(defpackage #:pgloader.queue
|
||||
@ -247,11 +249,8 @@
|
||||
#:pgloader.params #:pgloader.utils #:pgloader.connection
|
||||
#:pgloader.sources #:pgloader.queue)
|
||||
(:import-from #:pgloader.pgsql
|
||||
#:list-indexes
|
||||
#:drop-indexes
|
||||
#:with-pgsql-connection
|
||||
#:pgsql-execute-with-timing
|
||||
#:create-indexes-in-kernel)
|
||||
#:maybe-drop-indexes
|
||||
#:create-indexes-again)
|
||||
(:export #:*csv-path-root*
|
||||
#:csv-connection
|
||||
#:specs
|
||||
|
||||
@ -380,6 +380,52 @@
|
||||
(log-message :notice "~a" sql)
|
||||
(pgsql-execute-with-timing "drop indexes" sql state))))
|
||||
|
||||
;;;
|
||||
;;; Higher level API to care about indexes
|
||||
;;;
|
||||
(defun maybe-drop-indexes (target table-name state &key drop-indexes)
|
||||
"Drop the indexes for TABLE-NAME on TARGET PostgreSQL connection, and
|
||||
returns a list of indexes to create again."
|
||||
(with-pgsql-connection (target)
|
||||
(let ((indexes (list-indexes table-name)))
|
||||
(cond ((and indexes (not drop-indexes))
|
||||
(log-message :warning
|
||||
"Target table ~s has ~d indexes defined against it."
|
||||
table-name (length indexes))
|
||||
(log-message :warning
|
||||
"That could impact loading performance badly.")
|
||||
(log-message :warning
|
||||
"Consider the option 'drop indexes'."))
|
||||
|
||||
(indexes
|
||||
;; drop the indexes now
|
||||
(with-stats-collection ("drop indexes" :state state)
|
||||
(drop-indexes state indexes))))
|
||||
|
||||
;; and return the indexes list
|
||||
indexes)))
|
||||
|
||||
(defun create-indexes-again (target indexes state &key drop-indexes)
|
||||
"Create the indexes that we dropped previously."
|
||||
(when (and indexes drop-indexes)
|
||||
(let* ((idx-kernel (make-kernel (length indexes)))
|
||||
(idx-channel (let ((lp:*kernel* idx-kernel))
|
||||
(lp:make-channel))))
|
||||
(let ((pkeys
|
||||
(create-indexes-in-kernel target indexes idx-kernel idx-channel
|
||||
:state state)))
|
||||
|
||||
(with-stats-collection ("Index Build Completion" :state *state*)
|
||||
(loop :for idx :in indexes :do (lp:receive-result idx-channel)))
|
||||
|
||||
;; turn unique indexes into pkeys now
|
||||
(with-pgsql-connection (target)
|
||||
(with-stats-collection ("Primary Keys" :state state)
|
||||
(loop :for sql :in pkeys
|
||||
:when sql
|
||||
:do (progn
|
||||
(log-message :notice "~a" sql)
|
||||
(pgsql-execute-with-timing "Primary Keys" sql state)))))))))
|
||||
|
||||
;;;
|
||||
;;; Sequences
|
||||
|
||||
@ -186,23 +186,10 @@
|
||||
(lp:*kernel* (make-kernel 2))
|
||||
(channel (lp:make-channel))
|
||||
(queue (lq:make-queue :fixed-capacity *concurrent-batches*))
|
||||
indexes)
|
||||
|
||||
;; issue a performance warning against pre-existing indexes
|
||||
(with-pgsql-connection ((target-db csv))
|
||||
(setf indexes (list-indexes (target csv)))
|
||||
(cond ((and indexes (not drop-indexes))
|
||||
(log-message :warning
|
||||
"Target table ~s has ~d indexes defined against it."
|
||||
(target csv) (length indexes))
|
||||
(log-message :warning "That could impact loading performance badly"))
|
||||
|
||||
(indexes
|
||||
|
||||
;; drop the indexes now
|
||||
(with-stats-collection ("drop indexes" :state state-before
|
||||
:summary summary)
|
||||
(drop-indexes state-before indexes)))))
|
||||
(indexes (maybe-drop-indexes (target-db csv)
|
||||
(target csv)
|
||||
state-before
|
||||
:drop-indexes drop-indexes)))
|
||||
|
||||
(with-stats-collection ((target csv)
|
||||
:dbname (db-name (target-db csv))
|
||||
@ -229,26 +216,5 @@
|
||||
finally (lp:end-kernel))))
|
||||
|
||||
;; re-create the indexes
|
||||
(when (and indexes drop-indexes)
|
||||
(let* ((idx-kernel (make-kernel (length indexes)))
|
||||
(idx-channel (let ((lp:*kernel* idx-kernel))
|
||||
(lp:make-channel))))
|
||||
(let ((pkeys
|
||||
(create-indexes-in-kernel (target-db csv)
|
||||
indexes
|
||||
idx-kernel
|
||||
idx-channel
|
||||
:state state-after)))
|
||||
(with-stats-collection ("Index Build Completion" :state *state*)
|
||||
(loop :for idx :in indexes :do (lp:receive-result idx-channel)))
|
||||
|
||||
;; turn unique indexes into pkeys now
|
||||
(with-pgsql-connection ((target-db csv))
|
||||
(with-stats-collection ("Primary Keys" :state state-after)
|
||||
(loop :for sql :in pkeys
|
||||
:when sql
|
||||
:do (progn
|
||||
(log-message :notice "~a" sql)
|
||||
(pgsql-execute-with-timing "Primary Keys"
|
||||
sql
|
||||
state-after))))))))))
|
||||
(create-indexes-again (target-db csv) indexes state-after
|
||||
:drop-indexes drop-indexes)))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user