Fix parsing of qualified target table names, see #186.

We used to parse qualified table names as a simple string, which then
breaks attempts to be smart about how to quote idenfifiers. Some sources
are known to accept dots in quoted table names and we need to be able to
process that properly without tripping on qualified table names too
late.

Current code might not be the best approach as it's just using either a
cons or a string for table names internally, rather than defining a
proper data structure with a schema and a name slot.

Well, that's for a later cleanup patch, I happen to be lazy tonight.
This commit is contained in:
Dimitri Fontaine 2015-04-17 23:22:30 +02:00
parent 5ac396799a
commit 0068a45e1c
9 changed files with 59 additions and 32 deletions

View File

@ -59,6 +59,7 @@
#:unquote #:unquote
;; state ;; state
#:format-table-name
#:make-pgstate #:make-pgstate
#:pgstate-get-table #:pgstate-get-table
#:pgstate-add-table #:pgstate-add-table

View File

@ -121,7 +121,7 @@
(make-instance 'pgloader.copy:copy-copy (make-instance 'pgloader.copy:copy-copy
:target-db ,pg-db-conn :target-db ,pg-db-conn
:source source-db :source source-db
:target ,(pgconn-table-name pg-db-conn) :target ',(pgconn-table-name pg-db-conn)
:encoding ,encoding :encoding ,encoding
:fields ',fields :fields ',fields
:columns ',columns :columns ',columns

View File

@ -440,7 +440,7 @@
(make-instance 'pgloader.csv:copy-csv (make-instance 'pgloader.csv:copy-csv
:target-db ,pg-db-conn :target-db ,pg-db-conn
:source source-db :source source-db
:target ,(pgconn-table-name pg-db-conn) :target ',(pgconn-table-name pg-db-conn)
:encoding ,encoding :encoding ,encoding
:fields ',fields :fields ',fields
:columns ',columns :columns ',columns

View File

@ -113,7 +113,7 @@
maybe-quoted-namestring) maybe-quoted-namestring)
(:destructure (schema dot table) (:destructure (schema dot table)
(declare (ignore dot)) (declare (ignore dot))
(format nil "~a.~a" (text schema) (text table)))) (cons (text schema) (text table))))
(defrule dsn-table-name (or qualified-table-name maybe-quoted-namestring) (defrule dsn-table-name (or qualified-table-name maybe-quoted-namestring)
(:lambda (name) (:lambda (name)

View File

@ -140,7 +140,7 @@
(make-instance 'pgloader.fixed:copy-fixed (make-instance 'pgloader.fixed:copy-fixed
:target-db ,pg-db-conn :target-db ,pg-db-conn
:source source-db :source source-db
:target ,(pgconn-table-name pg-db-conn) :target ',(pgconn-table-name pg-db-conn)
:encoding ,encoding :encoding ,encoding
:fields ',fields :fields ',fields
:columns ',columns :columns ',columns

View File

@ -54,20 +54,31 @@
(truncate-tables pgconn (list table-name))) (truncate-tables pgconn (list table-name)))
(with-pgsql-connection (pgconn) (with-pgsql-connection (pgconn)
(when disable-triggers (disable-triggers table-name)) (let ((unqualified-table-name
(log-message :info "pgsql:copy-from-queue: ~a ~a" table-name columns) (typecase table-name
(loop (cons (let ((sql (format nil "SET search_path TO ~a;"
for (mesg batch read oversized?) = (lq:pop-queue queue) (car table-name))))
until (eq mesg :end-of-data) (log-message :notice "~a" sql)
for rows = (copy-batch table-name columns batch read) (pgsql-execute sql)
do (progn (cdr table-name)))
;; The SBCL implementation needs some Garbage Collection (string table-name))))
;; decision making help... and now is a pretty good time.
#+sbcl (when oversized? (sb-ext:gc :full t)) (when disable-triggers (disable-triggers unqualified-table-name))
(log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" (log-message :info "pgsql:copy-from-queue: ~a ~a" table-name columns)
table-name rows oversized?)
(pgstate-incf *state* table-name :rows rows))) (loop
(when disable-triggers (enable-triggers table-name)))) for (mesg batch read oversized?) = (lq:pop-queue queue)
until (eq mesg :end-of-data)
for rows = (copy-batch unqualified-table-name columns batch read)
do (progn
;; The SBCL implementation needs some Garbage Collection
;; decision making help... and now is a pretty good time.
#+sbcl (when oversized? (sb-ext:gc :full t))
(log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]"
unqualified-table-name rows oversized?)
(pgstate-incf *state* table-name :rows rows)))
(when disable-triggers (enable-triggers unqualified-table-name)))))
;;; ;;;
;;; When a batch has been refused by PostgreSQL with a data-exception, that ;;; When a batch has been refused by PostgreSQL with a data-exception, that

View File

@ -230,11 +230,18 @@
(defun truncate-tables (pgconn table-name-list) (defun truncate-tables (pgconn table-name-list)
"Truncate given TABLE-NAME in database DBNAME" "Truncate given TABLE-NAME in database DBNAME"
(with-pgsql-transaction (:pgconn pgconn) (with-pgsql-transaction (:pgconn pgconn)
(let ((sql (format nil "TRUNCATE ~{~a~^,~};" (flet ((process-table-name (table-name)
(loop :for table-name :in table-name-list (typecase table-name
:collect (apply-identifier-case table-name))))) (cons
(log-message :notice "~a" sql) (format nil "~a.~a"
(pomo:execute sql)))) (apply-identifier-case (car table-name))
(apply-identifier-case (cdr table-name))))
(string
(apply-identifier-case table-name)))))
(let ((sql (format nil "TRUNCATE ~{~a~^,~};"
(mapcar #'process-table-name table-name-list))))
(log-message :notice "~a" sql)
(pomo:execute sql)))))
(defun disable-triggers (table-name) (defun disable-triggers (table-name)
"Disable triggers on TABLE-NAME. Needs to be called with a PostgreSQL "Disable triggers on TABLE-NAME. Needs to be called with a PostgreSQL

View File

@ -117,7 +117,7 @@
(format *report-stream* (format *report-stream*
*header-tname-format* *header-tname-format*
*max-length-table-name* *max-length-table-name*
table-name) (format-table-name table-name))
(report-results read rows errs (format-interval secs nil))) (report-results read rows errs (format-interval secs nil)))
finally (when footer finally (when footer
(report-pgstate-stats pgstate footer)))) (report-pgstate-stats pgstate footer))))
@ -171,11 +171,12 @@
(*max-length-table-name* (*max-length-table-name*
(reduce #'max (reduce #'max
(mapcar #'length (mapcar #'length
(append (pgstate-tabnames state) (mapcar #'format-table-name
(when before (pgstate-tabnames before)) (append (pgstate-tabnames state)
(when finally (pgstate-tabnames finally)) (when before (pgstate-tabnames before))
(when parallel (pgstate-tabnames parallel)) (when finally (pgstate-tabnames finally))
(list legend))))) (when parallel (pgstate-tabnames parallel))
(list legend))))))
(*header-tname-format* (get-format-for stype :header-tname-format)) (*header-tname-format* (get-format-for stype :header-tname-format))
(*header-stats-format* (get-format-for stype :header-stats-format)) (*header-stats-format* (get-format-for stype :header-stats-format))
(*header-cols-format* (get-format-for stype :header-cols-format)) (*header-cols-format* (get-format-for stype :header-cols-format))

View File

@ -24,6 +24,12 @@
(errs 0 :type fixnum) (errs 0 :type fixnum)
(secs 0.0 :type float)) (secs 0.0 :type float))
(defun format-table-name (table-name)
"TABLE-NAME might be a CONS of a schema name and a table name."
(typecase table-name
(cons (format nil "~a.~a" (car table-name) (cdr table-name)))
(string table-name)))
(defun pgstate-get-table (pgstate name) (defun pgstate-get-table (pgstate name)
(gethash name (pgstate-tables pgstate))) (gethash name (pgstate-tables pgstate)))
@ -32,11 +38,12 @@
(or (pgstate-get-table pgstate table-name) (or (pgstate-get-table pgstate table-name)
(let* ((table (setf (gethash table-name (pgstate-tables pgstate)) (let* ((table (setf (gethash table-name (pgstate-tables pgstate))
(make-pgtable :name table-name))) (make-pgtable :name table-name)))
(reject-dir (merge-pathnames (format nil "~a/" dbname) *root-dir*)) (reject-dir (merge-pathnames (format nil "~a/" dbname) *root-dir*))
(filename (format-table-name table-name))
(data-pathname (make-pathname :defaults reject-dir (data-pathname (make-pathname :defaults reject-dir
:name table-name :type "dat")) :name filename :type "dat"))
(logs-pathname (make-pathname :defaults reject-dir (logs-pathname (make-pathname :defaults reject-dir
:name table-name :type "log"))) :name filename :type "log")))
;; maintain the ordering ;; maintain the ordering
(push table-name (pgstate-tabnames pgstate)) (push table-name (pgstate-tabnames pgstate))