Simplify batch and transformation handling.

Make batches of raw data straight from the reader output (map-rows) and
have the transformation worker focus on changing the batch content from
raw rows to copy strings.

Also review the organisation of responsabilities in the code, allowing
to move queue.lisp into utils/batch.lisp, renaming it as its scope has
been reduced to only care about preparing batches.

This came out of trying to have multiple workers concurrently processing
the batches from the reader and feeding the hardcoded 2 COPY workers,
but it failed for multiple reasons. All is left as of now is this
cleanup, which seems to be on the faster side of things, which is always
good.
This commit is contained in:
Dimitri Fontaine 2015-11-29 17:35:25 +01:00
parent 2dd7f68a30
commit cca44c800f
6 changed files with 122 additions and 122 deletions

View File

@ -40,7 +40,6 @@
:components
((:file "params")
(:file "package" :depends-on ("params"))
(:file "queue" :depends-on ("params" "package"))
(:module "monkey"
:components
@ -51,6 +50,7 @@
:depends-on ("package" "params")
:components
((:file "charsets")
(:file "batch")
(:file "threads")
(:file "logs")
(:file "utils")
@ -121,8 +121,7 @@
"connection"
"pgsql"
"utils"
"parsers"
"queue")
"parsers")
:components
((:module "common"
:components

View File

@ -110,6 +110,13 @@
#:show-encodings
#:make-external-format))
(defpackage #:pgloader.batch
(:use #:cl #:pgloader.params #:pgloader.monitor)
(:export #:*current-batch*
#:make-batch
#:batch-row
#:finish-current-batch))
;;
;; Not really a source, more a util package to deal with http and zip
@ -226,7 +233,7 @@
(defpackage #:pgloader.sources
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.schema #:pgloader.pgsql)
#:pgloader.schema #:pgloader.pgsql #:pgloader.batch)
(:import-from #:pgloader.transforms
#:precision
#:scale
@ -286,17 +293,6 @@
#:*cast-rules*
#:cast))
(defpackage #:pgloader.queue
(:use #:cl #:pgloader.params #:pgloader.monitor)
(:import-from #:pgloader.pgsql
#:format-vector-row)
(:import-from #:pgloader.sources
#:map-rows
#:transforms
#:target)
(:export #:cook-batches))
;;;
;;; Other utilities
@ -325,7 +321,7 @@
(defpackage #:pgloader.csv
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.pgsql
#:maybe-drop-indexes
#:create-indexes-again)
@ -342,7 +338,7 @@
(defpackage #:pgloader.fixed
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.csv
#:csv-connection
#:specs
@ -357,7 +353,7 @@
(defpackage #:pgloader.copy
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.csv
#:csv-connection
#:specs
@ -372,7 +368,7 @@
(defpackage #:pgloader.ixf
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute
@ -389,7 +385,7 @@
(defpackage #:pgloader.db3
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
#:pgsql-execute
@ -407,7 +403,7 @@
(defpackage #:pgloader.mysql
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.transforms #:precision #:scale)
(:import-from #:pgloader.pgsql
#:with-pgsql-connection
@ -448,7 +444,7 @@
(defpackage #:pgloader.sqlite
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.transforms #:precision #:scale)
(:import-from #:pgloader.pgsql
#:with-pgsql-transaction
@ -476,7 +472,7 @@
(defpackage #:pgloader.mssql
(:use #:cl
#:pgloader.params #:pgloader.utils #:pgloader.connection
#:pgloader.sources #:pgloader.queue)
#:pgloader.sources)
(:import-from #:pgloader.transforms #:precision #:scale)
(:import-from #:pgloader.pgsql
#:with-pgsql-connection

View File

@ -18,13 +18,11 @@
;; of a data error being signaled, that's the BATCH here.
(let ((copier (cl-postgres:open-db-writer db table-name columns)))
(unwind-protect
(loop for i below batch-rows
for copy-string = (aref batch i)
do (when (or (eq :data *log-min-messages*)
(eq :data *client-min-messages*))
(log-message :data "> ~s" copy-string))
do (cl-postgres:db-write-row copier nil copy-string)
finally (return batch-rows))
(loop :for i :below batch-rows
:for copy-string := (aref batch i)
:do (when copy-string
(cl-postgres:db-write-row copier nil copy-string))
:finally (return batch-rows))
(cl-postgres:close-db-writer copier))))
;; If PostgreSQL signals a data error, process the batch by isolating
@ -62,7 +60,7 @@
(loop
:for (mesg batch read oversized?) := (lq:pop-queue queue)
:until (eq mesg :end-of-data)
:until (eq :end-of-data mesg)
:for rows := (copy-batch unqualified-table-name columns batch read)
:do (progn
;; The SBCL implementation needs some Garbage Collection

View File

@ -1,82 +0,0 @@
;;;
;;; Tools to handle internal queueing, using lparallel.queue
;;;
(in-package :pgloader.queue)
;;;
;;; The pgloader architectures uses a reader thread and a writer thread. The
;;; reader fills in batches of data from the source of data, and the writer
;;; pushes the data down to PostgreSQL using the COPY protocol.
;;;
(defstruct batch
(start (get-internal-real-time) :type fixnum)
(data (make-array *copy-batch-rows* :element-type 'simple-string)
:type (vector simple-string *))
(count 0 :type fixnum)
(bytes 0 :type fixnum))
(defvar *current-batch* nil)
(defun cook-batches (copy raw-queue processed-queue &optional pre-formatted)
"Cook data from raw-queue into batches sent into processed-queue."
(let ((*current-batch* (make-batch)))
(loop :for row := (lq:pop-queue raw-queue)
:until (eq :end-of-data row)
:do (batch-row row copy processed-queue pre-formatted))
;; finish current-batch
(finish-current-batch copy processed-queue)
;; and before calling it a day, push the end-of-data marker
(log-message :debug "End of data.")
;; we hardcode 2 parallel COPY writers, see copy-from implementation.
(lq:push-queue (list :end-of-data nil nil nil) processed-queue)
(lq:push-queue (list :end-of-data nil nil nil) processed-queue)))
(defun finish-current-batch (copy queue
&optional oversized? (batch *current-batch*))
(with-slots (start data count) batch
(when (< 0 count)
(update-stats :data (target copy)
:read count
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count oversized?) queue))))
(declaim (inline oversized?))
(defun oversized? (&optional (batch *current-batch*))
"Return a generalized boolean that is true only when BATCH is considered
over-sized when its size in BYTES is compared *copy-batch-size*."
(and *copy-batch-size* ; defaults to nil
(<= *copy-batch-size* (batch-bytes batch))))
(defun batch-row (row copy queue &optional pre-formatted)
"Add ROW to the reader batch. When the batch is full, provide it to the
writer."
(let ((oversized? (oversized? *current-batch*)))
(when (or (= (batch-count *current-batch*) *copy-batch-rows*)
oversized?)
;; close current batch, prepare next one
(finish-current-batch copy queue oversized?)
(setf *current-batch* (make-batch))))
;; Add ROW to the current BATCH.
;;
;; All the data transformation takes place here, so that we batch fully
;; formed COPY TEXT string ready to go in the PostgreSQL stream.
(handler-case
(with-slots (data count bytes) *current-batch*
(let ((copy-string
(with-output-to-string (s)
(let ((c-s-bytes (format-vector-row s row
(transforms copy)
pre-formatted)))
(when *copy-batch-size* ; running under memory watch
(incf bytes c-s-bytes))))))
(setf (aref data count) copy-string)
(incf count)))
(condition (e)
(log-message :error "~a" e)
(update-stats :data (target copy) :errs 1))))

View File

@ -10,13 +10,18 @@
"Stream data as read by the map-queue method on the COPY argument into QUEUE,
as given."
(log-message :debug "Reader started for ~a" (target copy))
(let ((start-time (get-internal-real-time)))
(map-rows copy :process-row-fn (lambda (data)
(let ((start-time (get-internal-real-time))
(*current-batch* (make-batch)))
(map-rows copy :process-row-fn (lambda (row)
(when (or (eq :data *log-min-messages*)
(eq :data *client-min-messages*))
(log-message :data "< ~s" data))
(lq:push-queue data queue)))
(lq:push-queue :end-of-data queue)
(log-message :data "< ~s" row))
(batch-row row (target copy) queue)))
;; last batch
(finish-current-batch (target copy) queue)
;; mark end of stream
(lq:push-queue (list :end-of-data nil nil nil) queue)
(let ((seconds (elapsed-time-since start-time)))
(log-message :info "Reader for ~a is done in ~6$s" (target copy) seconds)
@ -29,12 +34,44 @@
(log-message :debug "Transformer in action for ~a!" (target copy))
(let ((start-time (get-internal-real-time)))
(pgloader.queue:cook-batches copy raw-queue formatted-queue pre-formatted)
(loop :for (mesg batch count oversized?) := (lq:pop-queue raw-queue)
:until (eq :end-of-data mesg)
:do (let ((batch-start-time (get-internal-real-time)))
;; transform each row of the batch into a copy-string
(loop :for i :below count
:do (let ((copy-string
(handler-case
(with-output-to-string (s)
(format-vector-row s
(aref batch i)
(transforms copy)
pre-formatted))
(condition (e)
(log-message :error "~a" e)
(update-stats :data (target copy) :errs 1)
nil))))
(when (or (eq :data *log-min-messages*)
(eq :data *client-min-messages*))
(log-message :data "> ~s" copy-string))
(setf (aref batch i) copy-string)))
;; the batch is ready, log about it
(log-message :debug "format-data-to-copy[~a] ~d row in ~6$s"
(lp:kernel-worker-index)
count
(elapsed-time-since batch-start-time))
;; and send the formatted batch of copy-strings down to PostgreSQL
(lq:push-queue (list mesg batch count oversized?) formatted-queue)))
;; mark end of stream, twice because we hardcode 2 COPY processes, see below
(lq:push-queue (list :end-of-data nil nil nil) formatted-queue)
(lq:push-queue (list :end-of-data nil nil nil) formatted-queue)
;; and return
(let ((seconds (elapsed-time-since start-time)))
(log-message :info "Transformer for ~a is done in ~6$s" (target copy) seconds)
(list :worker (target copy) seconds))))
(log-message :info "Transformer for ~a is done in ~6$s" (target copy) seconds)
(list :worker (target copy) seconds))))
(defmethod copy-column-list ((copy copy))
"Default column list is an empty list."
@ -59,7 +96,7 @@
"Copy data from COPY source into PostgreSQL."
(let* ((lp:*kernel* (or kernel (make-kernel 4)))
(channel (or channel (lp:make-channel)))
(rawq (lq:make-queue))
(rawq (lq:make-queue :fixed-capacity *concurrent-batches*))
(fmtq (lq:make-queue :fixed-capacity *concurrent-batches*))
(table-name (format-table-name (target copy))))

52
src/utils/batch.lisp Normal file
View File

@ -0,0 +1,52 @@
;;;
;;; Tools to handle internal queueing, using lparallel.queue
;;;
(in-package :pgloader.batch)
;;;
;;; The pgloader architectures uses a reader thread and a writer thread. The
;;; reader fills in batches of data from the source of data, and the writer
;;; pushes the data down to PostgreSQL using the COPY protocol.
;;;
(defstruct batch
(start (get-internal-real-time) :type fixnum)
(data (make-array *copy-batch-rows* :element-type 'simple-string)
:type (vector simple-string *))
(count 0 :type fixnum)
(bytes 0 :type fixnum))
(defvar *current-batch* nil)
(defun finish-current-batch (target queue
&optional oversized? (batch *current-batch*))
(with-slots (start data count) batch
(when (< 0 count)
(log-message :debug "finish-current-batch[~a] ~d row~:p in ~6$s"
(lp:kernel-worker-index) count
(elapsed-time-since start))
(update-stats :data target
:read count
:rs (elapsed-time-since start))
(lq:push-queue (list :batch data count oversized?) queue))))
(declaim (inline oversized?))
(defun oversized? (&optional (batch *current-batch*))
"Return a generalized boolean that is true only when BATCH is considered
over-sized when its size in BYTES is compared *copy-batch-size*."
(and *copy-batch-size* ; defaults to nil
(<= *copy-batch-size* (batch-bytes batch))))
(defun batch-row (row target queue)
"Add ROW to the reader batch. When the batch is full, provide it to the
writer."
(let ((oversized? (oversized? *current-batch*)))
(when (or (= (batch-count *current-batch*) *copy-batch-rows*)
oversized?)
;; close current batch, prepare next one
(finish-current-batch target queue oversized?)
(setf *current-batch* (make-batch))))
(with-slots (data count bytes) *current-batch*
(setf (aref data count) row)
(incf count)))