mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-08 07:16:58 +02:00
576 lines
20 KiB
Common Lisp
576 lines
20 KiB
Common Lisp
;;;
|
||
;;; Tools to handle MySQL data fetching
|
||
;;;
|
||
|
||
(in-package :pgloader.mysql)
|
||
|
||
;;;
|
||
;;; MySQL tools connecting to a database
|
||
;;;
|
||
(defun list-databases (&key
|
||
(host *myconn-host*)
|
||
(user *myconn-user*)
|
||
(pass *myconn-pass*))
|
||
"Connect to a local database and get the database list"
|
||
(cl-mysql:connect :host host :user user :password pass)
|
||
(unwind-protect
|
||
(mapcan #'identity (caar (cl-mysql:query "show databases")))
|
||
(cl-mysql:disconnect)))
|
||
|
||
(defun list-tables (dbname
|
||
&key
|
||
(host *myconn-host*)
|
||
(user *myconn-user*)
|
||
(pass *myconn-pass*))
|
||
"Return a flat list of all the tables names known in given DATABASE"
|
||
(cl-mysql:connect :host host :user user :password pass)
|
||
|
||
(unwind-protect
|
||
(progn
|
||
(cl-mysql:use dbname)
|
||
;; that returns a pretty weird format, process it
|
||
(mapcan #'identity (caar (cl-mysql:list-tables))))
|
||
;; free resources
|
||
(cl-mysql:disconnect)))
|
||
|
||
;;;
|
||
;;; Tools to get MySQL table and columns definitions and transform them to
|
||
;;; PostgreSQL CREATE TABLE statements, and run those.
|
||
;;;
|
||
(defun list-all-columns (dbname
|
||
&key
|
||
(host *myconn-host*)
|
||
(user *myconn-user*)
|
||
(pass *myconn-pass*))
|
||
"Get the list of MySQL column names per table."
|
||
(cl-mysql:connect :host host :user user :password pass)
|
||
|
||
(unwind-protect
|
||
(progn
|
||
(loop
|
||
with schema = nil
|
||
for (table-name . coldef)
|
||
in
|
||
(caar (cl-mysql:query (format nil "
|
||
select table_name, column_name,
|
||
data_type, column_type, column_default,
|
||
is_nullable, extra
|
||
from information_schema.columns
|
||
where table_schema = '~a'
|
||
order by table_name, ordinal_position" dbname)))
|
||
do
|
||
(let ((entry (assoc table-name schema :test 'equal)))
|
||
(if entry
|
||
(push coldef (cdr entry))
|
||
(push (cons table-name (list coldef)) schema)))
|
||
finally
|
||
;; we did push, we need to reverse here
|
||
(return (loop
|
||
for (name . cols) in schema
|
||
collect (cons name (reverse cols))))))
|
||
|
||
;; free resources
|
||
(cl-mysql:disconnect)))
|
||
|
||
(defun get-create-type (table-name cols &key identifier-case include-drop)
|
||
"MySQL declares ENUM types inline, PostgreSQL wants explicit CREATE TYPE."
|
||
(loop
|
||
for (name dtype ctype default nullable extra) in cols
|
||
when (string-equal "enum" dtype)
|
||
collect (when include-drop
|
||
(let* ((type-name
|
||
(get-enum-type-name table-name name identifier-case)))
|
||
(format nil "DROP TYPE IF EXISTS ~a;" type-name)))
|
||
and collect (get-create-enum table-name name ctype
|
||
:identifier-case identifier-case)))
|
||
|
||
(defun get-create-table (table-name cols &key identifier-case)
|
||
"Return a PostgreSQL CREATE TABLE statement from MySQL columns"
|
||
(with-output-to-string (s)
|
||
(let ((table-name (apply-identifier-case table-name identifier-case)))
|
||
(format s "CREATE TABLE ~a ~%(~%" table-name))
|
||
(loop
|
||
for ((name dtype ctype default nullable extra) . last?) on cols
|
||
for pg-coldef = (cast table-name name dtype ctype default nullable extra)
|
||
for colname = (apply-identifier-case name identifier-case)
|
||
do (format s " ~a ~22t ~a~:[~;,~]~%" colname pg-coldef last?))
|
||
(format s ");~%")))
|
||
|
||
(defun get-drop-table-if-exists (table-name &key identifier-case)
|
||
"Return the PostgreSQL DROP TABLE IF EXISTS statement for TABLE-NAME."
|
||
(let ((table-name (apply-identifier-case table-name identifier-case)))
|
||
(format nil "DROP TABLE IF EXISTS ~a;~%" table-name)))
|
||
|
||
(defun get-pgsql-create-tables (all-columns
|
||
&key
|
||
include-drop
|
||
(identifier-case :downcase))
|
||
"Return the list of CREATE TABLE statements to run against PostgreSQL"
|
||
(loop
|
||
for (table-name . cols) in all-columns
|
||
for extra-types = (get-create-type table-name cols
|
||
:identifier-case identifier-case
|
||
:include-drop include-drop)
|
||
when include-drop
|
||
collect (get-drop-table-if-exists table-name
|
||
:identifier-case identifier-case)
|
||
|
||
when extra-types append extra-types
|
||
|
||
collect (get-create-table table-name cols
|
||
:identifier-case identifier-case)))
|
||
|
||
(defun pgsql-create-tables (dbname all-columns
|
||
&key
|
||
(identifier-case :downcase)
|
||
(pg-dbname dbname)
|
||
include-drop)
|
||
"Create all MySQL tables in database dbname in PostgreSQL"
|
||
(loop
|
||
for nb-tables from 0
|
||
for sql in (get-pgsql-create-tables all-columns
|
||
:identifier-case identifier-case
|
||
:include-drop include-drop)
|
||
do (pgloader.pgsql:execute pg-dbname sql :client-min-messages :warning)
|
||
finally (return nb-tables)))
|
||
|
||
|
||
;;;
|
||
;;; Tools to get MySQL indexes definitions and transform them to PostgreSQL
|
||
;;; indexes definitions, and run those statements.
|
||
;;;
|
||
(defun list-all-indexes (dbname
|
||
&key
|
||
(host *myconn-host*)
|
||
(user *myconn-user*)
|
||
(pass *myconn-pass*))
|
||
"Get the list of MySQL index definitions per table."
|
||
(cl-mysql:connect :host host :user user :password pass)
|
||
|
||
(unwind-protect
|
||
(progn
|
||
(loop
|
||
with schema = nil
|
||
for (table-name . index-def)
|
||
in (caar (cl-mysql:query (format nil "
|
||
SELECT table_name, index_name, non_unique,
|
||
GROUP_CONCAT(column_name order by seq_in_index)
|
||
FROM information_schema.statistics
|
||
WHERE table_schema = '~a'
|
||
GROUP BY table_name, index_name;" dbname)))
|
||
do (let ((entry (assoc table-name schema :test 'equal))
|
||
(index-def
|
||
(destructuring-bind (index-name non-unique cols)
|
||
index-def
|
||
(list index-name
|
||
(not (= 1 non-unique))
|
||
(sq:split-sequence #\, cols)))))
|
||
(if entry
|
||
(push index-def (cdr entry))
|
||
(push (cons table-name (list index-def)) schema)))
|
||
finally
|
||
;; we did push, we need to reverse here
|
||
(return (reverse (loop
|
||
for (name . indexes) in schema
|
||
collect (cons name (reverse indexes)))))))
|
||
|
||
;; free resources
|
||
(cl-mysql:disconnect)))
|
||
|
||
(defun get-pgsql-index-def (table-name index-name unique cols
|
||
&key identifier-case)
|
||
"Return a PostgreSQL CREATE INDEX statement as a string."
|
||
(let* ((index-name (format nil "~a_~a_idx" table-name index-name))
|
||
(table-name (apply-identifier-case table-name identifier-case))
|
||
(index-name (apply-identifier-case index-name identifier-case)))
|
||
(cond
|
||
((string= index-name "PRIMARY")
|
||
(format nil
|
||
"ALTER TABLE ~a ADD PRIMARY KEY (~{~a~^, ~});" table-name cols))
|
||
|
||
(t
|
||
(format nil
|
||
"CREATE~:[~; UNIQUE~] INDEX ~a ON ~a (~{~a~^, ~});"
|
||
unique index-name table-name cols)))))
|
||
|
||
(defun get-drop-index-if-exists (table-name index-name &key identifier-case)
|
||
"Return the DROP INDEX statement for PostgreSQL"
|
||
(cond
|
||
((string= index-name "PRIMARY")
|
||
(let* ((pkey-name (format nil "~a_pkey" table-name))
|
||
(pkey-name (apply-identifier-case pkey-name identifier-case))
|
||
(table-name (apply-identifier-case table-name identifier-case)))
|
||
(format nil "ALTER TABLE ~a DROP CONSTRAINT ~a;" table-name pkey-name)))
|
||
|
||
(t
|
||
(let* ((index-name (format nil "~a_idx" index-name))
|
||
(index-name (apply-identifier-case index-name identifier-case)))
|
||
(format nil "DROP INDEX IF EXISTS ~a;" index-name)))))
|
||
|
||
(defun get-pgsql-drop-indexes (table-name indexes &key identifier-case)
|
||
"Return the DROP INDEX statements for given INDEXES definitions."
|
||
(loop
|
||
for (index-name unique cols) in indexes
|
||
;; no need to alter table drop constraint, when include-drop
|
||
;; is true we just did drop the table and created it again
|
||
;; anyway
|
||
unless (string= index-name "PRIMARY")
|
||
collect (get-drop-index-if-exists table-name index-name
|
||
:identifier-case identifier-case)))
|
||
|
||
(defun get-pgsql-create-indexes (table-name indexes
|
||
&key identifier-case include-drop)
|
||
"Return the CREATE INDEX statements from given INDEXES definitions."
|
||
(loop
|
||
for (index-name unique cols) in indexes
|
||
collect (get-pgsql-index-def table-name index-name unique cols
|
||
:identifier-case identifier-case)))
|
||
|
||
|
||
;;;
|
||
;;; Map a function to each row extracted from MySQL
|
||
;;;
|
||
(defun get-column-list-with-is-nulls (dbname table-name)
|
||
"We can't just SELECT *, we need to cater for NULLs in text columns ourselves.
|
||
This function assumes a valid connection to the MySQL server has been
|
||
established already."
|
||
(loop
|
||
for (name type) in (caar (cl-mysql:query (format nil "
|
||
select column_name, data_type
|
||
from information_schema.columns
|
||
where table_schema = '~a' and table_name = '~a'
|
||
order by ordinal_position" dbname table-name)))
|
||
for is-null = (member type '("char" "varchar" "text"
|
||
"tinytext" "mediumtext" "longtext")
|
||
:test #'string-equal)
|
||
collect nil into nulls
|
||
collect (format nil "`~a`" name) into cols
|
||
when is-null
|
||
collect (format nil "`~a` is null" name) into cols and collect t into nulls
|
||
finally (return (values cols nulls))))
|
||
|
||
(defun fix-nulls (row nulls)
|
||
"Given a cl-mysql row result and a nulls list as from
|
||
get-column-list-with-is-nulls, replace NIL with empty strings with when
|
||
we know from the added 'foo is null' that the actual value IS NOT NULL.
|
||
|
||
See http://bugs.mysql.com/bug.php?id=19564 for context."
|
||
(loop
|
||
for (current-col next-col) on row
|
||
for (current-null next-null) on nulls
|
||
;; next-null tells us if next column is an "is-null" col
|
||
;; when next-null is true, next-col is true if current-col is actually null
|
||
for is-null = (and next-null (string= next-col "1"))
|
||
for is-empty = (and next-null (string= next-col "0") (null current-col))
|
||
;; don't collect columns we added, e.g. "column_name is not null"
|
||
when (not current-null)
|
||
collect (cond (is-null :null)
|
||
(is-empty "")
|
||
(t current-col))))
|
||
|
||
(defun map-rows (dbname table-name &key process-row-fn)
|
||
"Extract MySQL data and call PROCESS-ROW-FN function with a single
|
||
argument (a list of column values) for each row."
|
||
(cl-mysql:connect :host *myconn-host*
|
||
:port *myconn-port*
|
||
:user *myconn-user*
|
||
:password *myconn-pass*)
|
||
|
||
(unwind-protect
|
||
(progn
|
||
;; Ensure we're talking utf-8 and connect to DBNAME in MySQL
|
||
(cl-mysql:query "SET NAMES 'utf8'")
|
||
(cl-mysql:query "SET character_set_results = utf8;")
|
||
(cl-mysql:use dbname)
|
||
|
||
(multiple-value-bind (cols nulls)
|
||
(get-column-list-with-is-nulls dbname table-name)
|
||
(let* ((sql (format nil "SELECT ~{~a~^, ~} FROM ~a;" cols table-name))
|
||
(q (cl-mysql:query sql :store nil :type-map nil))
|
||
(rs (cl-mysql:next-result-set q)))
|
||
(declare (ignore rs))
|
||
|
||
;; Now fetch MySQL rows directly in the stream
|
||
(loop
|
||
for row = (cl-mysql:next-row q :type-map (make-hash-table))
|
||
while row
|
||
for row-with-proper-nulls = (fix-nulls row nulls)
|
||
counting row into count
|
||
do (funcall process-row-fn row-with-proper-nulls)
|
||
finally (return count)))))
|
||
|
||
;; free resources
|
||
(cl-mysql:disconnect)))
|
||
|
||
|
||
;;;
|
||
;;; Use map-rows and pgsql-text-copy-format to fill in a CSV file on disk
|
||
;;; with MySQL data in there.
|
||
;;;
|
||
(defun copy-to (dbname table-name filename &key transforms)
|
||
"Extrat data from MySQL in PostgreSQL COPY TEXT format"
|
||
(with-open-file (text-file filename
|
||
:direction :output
|
||
:if-exists :supersede
|
||
:external-format :utf-8)
|
||
(map-rows dbname table-name
|
||
:process-row-fn
|
||
(lambda (row)
|
||
(pgloader.pgsql:format-row text-file row
|
||
:transforms transforms)))))
|
||
|
||
;;;
|
||
;;; MySQL bulk export to file, in PostgreSQL COPY TEXT format
|
||
;;;
|
||
(defun export-database (dbname &key only-tables)
|
||
"Export MySQL tables into as many TEXT files, in the PostgreSQL COPY format"
|
||
(let ((all-columns (list-all-columns dbname)))
|
||
(setf *state* (pgloader.utils:make-pgstate))
|
||
(report-header)
|
||
(loop
|
||
for (table-name . cols) in all-columns
|
||
for filename = (pgloader.csv:get-pathname dbname table-name)
|
||
when (or (null only-tables)
|
||
(member table-name only-tables :test #'equal))
|
||
do
|
||
(pgstate-add-table *state* dbname table-name)
|
||
(report-table-name table-name)
|
||
(multiple-value-bind (rows secs)
|
||
(timing
|
||
;; load data
|
||
(copy-to dbname table-name filename :transforms (transforms cols)))
|
||
;; update and report stats
|
||
(pgstate-incf *state* table-name :read rows :secs secs)
|
||
(report-pgtable-stats *state* table-name))
|
||
finally
|
||
(report-pgstate-stats *state* "Total export time"))))
|
||
|
||
;;;
|
||
;;; Copy data from a target database into files in the PostgreSQL COPY TEXT
|
||
;;; format, then load those files. Useful mainly to compare timing with the
|
||
;;; direct streaming method. If you need to pre-process the files, use
|
||
;;; export-database, do the extra processing, then use
|
||
;;; pgloader.pgsql:copy-from-file on each file.
|
||
;;;
|
||
(defun export-import-database (dbname
|
||
&key
|
||
(pg-dbname dbname)
|
||
(truncate t)
|
||
only-tables)
|
||
"Export MySQL data and Import it into PostgreSQL"
|
||
;; get the list of tables and have at it
|
||
(let ((all-columns (list-all-columns dbname)))
|
||
(setf *state* (pgloader.utils:make-pgstate))
|
||
(report-header)
|
||
(loop
|
||
for (table-name . cols) in all-columns
|
||
when (or (null only-tables)
|
||
(member table-name only-tables :test #'equal))
|
||
do
|
||
(pgstate-add-table *state* dbname table-name)
|
||
(report-table-name table-name)
|
||
|
||
(multiple-value-bind (res secs)
|
||
(timing
|
||
(let* ((filename (pgloader.csv:get-pathname dbname table-name)))
|
||
;; export from MySQL to file
|
||
(copy-to dbname table-name filename :transforms (transforms cols))
|
||
;; import the file to PostgreSQL
|
||
(pgloader.pgsql:copy-from-file pg-dbname
|
||
table-name
|
||
filename
|
||
:truncate truncate)))
|
||
(declare (ignore res))
|
||
(pgstate-incf *state* table-name :secs secs)
|
||
(report-pgtable-stats *state* table-name))
|
||
finally
|
||
(report-pgstate-stats *state* "Total export+import time"))))
|
||
|
||
;;;
|
||
;;; Export MySQL data to our lparallel data queue. All the work is done in
|
||
;;; other basic layers, simple enough function.
|
||
;;;
|
||
(defun copy-to-queue (dbname table-name dataq)
|
||
"Copy data from MySQL table DBNAME.TABLE-NAME into queue DATAQ"
|
||
(let ((read
|
||
(pgloader.queue:map-push-queue dataq #'map-rows dbname table-name)))
|
||
(pgstate-incf *state* table-name :read read)))
|
||
|
||
|
||
;;;
|
||
;;; Direct "stream" in between mysql fetching of results and PostgreSQL COPY
|
||
;;; protocol
|
||
;;;
|
||
(defun stream-table (dbname table-name
|
||
&key
|
||
(kernel nil k-s-p)
|
||
(pg-dbname dbname)
|
||
truncate
|
||
transforms)
|
||
"Connect in parallel to MySQL and PostgreSQL and stream the data."
|
||
(let* ((summary (null *state*))
|
||
(*state* (or *state* (pgloader.utils:make-pgstate)))
|
||
(lp:*kernel*
|
||
(or kernel
|
||
(lp:make-kernel 2 :bindings
|
||
`((*pgconn-host* . ,*pgconn-host*)
|
||
(*pgconn-port* . ,*pgconn-port*)
|
||
(*pgconn-user* . ,*pgconn-user*)
|
||
(*pgconn-pass* . ,*pgconn-pass*)
|
||
(*pg-settings* . ',*pg-settings*)
|
||
(*myconn-host* . ,*myconn-host*)
|
||
(*myconn-port* . ,*myconn-port*)
|
||
(*myconn-user* . ,*myconn-user*)
|
||
(*myconn-pass* . ,*myconn-pass*)
|
||
(*state* . ,*state*)))))
|
||
(channel (lp:make-channel))
|
||
(dataq (lq:make-queue :fixed-capacity 4096)))
|
||
|
||
(with-stats-collection (dbname table-name :state *state* :summary summary)
|
||
(log-message :notice "COPY ~a.~a" dbname table-name)
|
||
;; read data from MySQL
|
||
(lp:submit-task channel (lambda ()
|
||
;; this function update :read stats
|
||
(copy-to-queue dbname table-name dataq)))
|
||
|
||
;; and start another task to push that data from the queue to PostgreSQL
|
||
(lp:submit-task
|
||
channel
|
||
(lambda ()
|
||
;; this function update :rows stats
|
||
(pgloader.pgsql:copy-from-queue pg-dbname table-name dataq
|
||
:truncate truncate
|
||
:transforms transforms)))
|
||
|
||
;; now wait until both the tasks are over
|
||
(loop for tasks below 2 do (lp:receive-result channel)
|
||
finally
|
||
(log-message :info "COPY ~a.~a done." dbname table-name)
|
||
(unless k-s-p (lp:end-kernel))))))
|
||
|
||
;;;
|
||
;;; Work on all tables for given database
|
||
;;;
|
||
(defun execute-with-timing (dbname label sql state &key (count 1))
|
||
"Execute given SQL and resgister its timing into STATE."
|
||
(multiple-value-bind (res secs)
|
||
(timing (pgloader.pgsql:execute dbname sql))
|
||
(declare (ignore res))
|
||
(pgstate-incf state label :rows count :secs secs)))
|
||
|
||
(defun create-indexes-in-kernel (dbname table-name indexes kernel channel
|
||
&key
|
||
identifier-case include-drop
|
||
state (label "create index"))
|
||
"Create indexes for given table in dbname, using given lparallel KERNEL
|
||
and CHANNEL so that the index build happen in concurrently with the data
|
||
copying."
|
||
(let* ((lp:*kernel* kernel))
|
||
(pgstate-add-table state dbname label)
|
||
|
||
(when include-drop
|
||
(let ((drop-channel (lp:make-channel))
|
||
(drop-indexes
|
||
(get-pgsql-drop-indexes table-name indexes
|
||
:identifier-case identifier-case)))
|
||
(loop
|
||
for sql in drop-indexes
|
||
do
|
||
(log-message :notice "~a" sql)
|
||
(lp:submit-task drop-channel #'pgloader.pgsql:execute dbname sql))
|
||
|
||
;; wait for the DROP INDEX to be done before issuing CREATE INDEX
|
||
(loop for idx in drop-indexes do (lp:receive-result drop-channel))))
|
||
|
||
(loop
|
||
for sql in (get-pgsql-create-indexes table-name indexes
|
||
:identifier-case identifier-case
|
||
:include-drop include-drop)
|
||
do
|
||
(log-message :notice "~a" sql)
|
||
(lp:submit-task channel #'execute-with-timing dbname label sql state))))
|
||
|
||
(defun stream-database (dbname
|
||
&key
|
||
(pg-dbname dbname)
|
||
(schema-only nil)
|
||
(create-tables nil)
|
||
(include-drop nil)
|
||
(create-indexes t)
|
||
(reset-sequences t)
|
||
(identifier-case :downcase) ; or :quote
|
||
(truncate t)
|
||
only-tables)
|
||
"Export MySQL data and Import it into PostgreSQL"
|
||
;; get the list of tables and have at it
|
||
(let* ((*state* (make-pgstate))
|
||
(idx-state (make-pgstate))
|
||
(copy-kernel (make-kernel 2))
|
||
(all-columns (list-all-columns dbname))
|
||
(all-indexes (list-all-indexes dbname))
|
||
(max-indexes (loop for (table . indexes) in all-indexes
|
||
maximizing (length indexes)))
|
||
(idx-kernel (make-kernel max-indexes))
|
||
(idx-channel (let ((lp:*kernel* idx-kernel))
|
||
(lp:make-channel))))
|
||
|
||
;; if asked, first drop/create the tables on the PostgreSQL side
|
||
(when create-tables
|
||
(log-message :notice "~:[~;DROP then ~]CREATE TABLES" include-drop)
|
||
(pgsql-create-tables dbname all-columns
|
||
:identifier-case identifier-case
|
||
:pg-dbname pg-dbname
|
||
:include-drop include-drop))
|
||
|
||
(loop
|
||
for (table-name . columns) in all-columns
|
||
when (or (null only-tables)
|
||
(member table-name only-tables :test #'equal))
|
||
do
|
||
(progn
|
||
;; first COPY the data from MySQL to PostgreSQL, using copy-kernel
|
||
(unless schema-only
|
||
(stream-table dbname table-name
|
||
:kernel copy-kernel
|
||
:pg-dbname pg-dbname
|
||
:truncate truncate
|
||
:transforms (transforms columns)))
|
||
|
||
;; Create the indexes for that table in parallel with the next
|
||
;; COPY, and all at once in concurrent threads to benefit from
|
||
;; PostgreSQL synchronous scan ability
|
||
;;
|
||
;; We just push new index build as they come along, if one
|
||
;; index build requires much more time than the others our
|
||
;; index build might get unsync: indexes for different tables
|
||
;; will get built in parallel --- not a big problem.
|
||
(when create-indexes
|
||
(let* ((indexes
|
||
(cdr (assoc table-name all-indexes :test #'string=))))
|
||
(create-indexes-in-kernel dbname table-name indexes
|
||
idx-kernel idx-channel
|
||
:state idx-state
|
||
:include-drop include-drop
|
||
:identifier-case identifier-case)))))
|
||
|
||
;; don't forget to reset sequences, but only when we did actually import
|
||
;; the data.
|
||
(when (and (not schema-only) reset-sequences)
|
||
(pgloader.pgsql:reset-all-sequences pg-dbname))
|
||
|
||
;; now end the kernels
|
||
(let ((lp:*kernel* idx-kernel)) (lp:end-kernel))
|
||
(let ((lp:*kernel* copy-kernel))
|
||
;; wait until the indexes are done being built...
|
||
;; don't forget accounting for that waiting time.
|
||
(with-stats-collection (dbname "index build completion" :state *state*)
|
||
(loop for idx in all-indexes do (lp:receive-result idx-channel)))
|
||
(lp:end-kernel))
|
||
|
||
;; and report the total time spent on the operation
|
||
(report-summary)
|
||
(format t pgloader.utils::*header-line*)
|
||
(report-summary :state idx-state :header nil :footer nil)
|
||
(report-pgstate-stats *state* "Total streaming time")))
|
||
|