mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-11 16:57:00 +02:00
With the new internal setting *copy-batch-size* it's now possible to instruct pgloader to close batches early (before *copy-batch-rows* limit) when crossing the byte count threshold. When set to 20 MB it allows the new test case (exhausted) to pass under SBCL and CCL, and there's no measurable cost when *copy-batch-size* is set to nil (its default value) in the testing done. This patch is published without any way to tune the values from the command language yet, that's the next step once its been proven effective.
This commit is contained in:
parent
ceec4780f2
commit
a8b0f91f37
@ -115,6 +115,8 @@
|
||||
|
||||
(defpackage #:pgloader.queue
|
||||
(:use #:cl #:pgloader.params)
|
||||
(:import-from #:pgloader.monitor
|
||||
#:log-message)
|
||||
(:import-from #:pgloader.pgsql
|
||||
#:format-vector-row)
|
||||
(:import-from #:pgloader.sources
|
||||
|
@ -12,7 +12,8 @@
|
||||
#:*client-min-messages*
|
||||
#:*log-min-messages*
|
||||
#:*copy-batch-rows*
|
||||
#:*concurrent-batches*
|
||||
#:*copy-batch-size*
|
||||
#:*concurrent-batches*
|
||||
#:*pgconn-host*
|
||||
#:*pgconn-port*
|
||||
#:*pgconn-user*
|
||||
@ -57,10 +58,13 @@
|
||||
;;; How to split batches in case of data loading errors.
|
||||
;;;
|
||||
(defparameter *copy-batch-rows* 25000
|
||||
"How many rows to batch per COPY transaction")
|
||||
"How many rows to batch per COPY transaction.")
|
||||
|
||||
(defparameter *copy-batch-size* nil ;; (* 20 1024 1024)
|
||||
"Maximum memory size allowed for a single batch.")
|
||||
|
||||
(defparameter *concurrent-batches* 10
|
||||
"How many batches do we stack in the queue in advance")
|
||||
"How many batches do we stack in the queue in advance.")
|
||||
|
||||
;;;
|
||||
;;; We need that to setup our default connection parameters
|
||||
|
@ -49,11 +49,15 @@
|
||||
|
||||
(with-pgsql-connection (dbname)
|
||||
(loop
|
||||
for (mesg batch read) = (lq:pop-queue queue)
|
||||
for (mesg batch read oversized?) = (lq:pop-queue queue)
|
||||
until (eq mesg :end-of-data)
|
||||
for rows = (copy-batch table-name columns batch read)
|
||||
do (progn
|
||||
(log-message :debug "copy-batch ~a ~d row~:p" table-name rows)
|
||||
;; The SBCL implementation needs some Garbage Collection
|
||||
;; decision making help... and now is a pretty good time.
|
||||
#+sbcl (when oversized? (sb-ext:gc :full t))
|
||||
(log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]"
|
||||
table-name rows oversized?)
|
||||
(pgstate-incf *state* table-name :rows rows)))))
|
||||
|
||||
;;;
|
||||
|
@ -18,18 +18,28 @@
|
||||
(defstruct batch
|
||||
(data (make-array *copy-batch-rows* :element-type 'simple-string)
|
||||
:type (vector simple-string *))
|
||||
(count 0 :type fixnum))
|
||||
(count 0 :type fixnum)
|
||||
(bytes 0 :type fixnum))
|
||||
|
||||
(defvar *current-batch* nil)
|
||||
|
||||
(declaim (inline oversized?))
|
||||
(defun oversized? (&optional (batch *current-batch*))
|
||||
"Return a generalized boolean that is true only when BATCH is considered
|
||||
over-sized when its size in BYTES is compared *copy-batch-size*."
|
||||
(and *copy-batch-size* ; defaults to nil
|
||||
(<= *copy-batch-size* (batch-bytes batch))))
|
||||
|
||||
(defun batch-row (row copy queue)
|
||||
"Add ROW to the reader batch. When the batch is full, provide it to the
|
||||
writer as the *writer-batch*."
|
||||
(when (= (batch-count *current-batch*) *copy-batch-rows*)
|
||||
;; close current batch, prepare next one
|
||||
(with-slots (data count) *current-batch*
|
||||
(lq:push-queue (list :batch data count) queue))
|
||||
(setf *current-batch* (make-batch)))
|
||||
(let ((oversized? (oversized? *current-batch*)))
|
||||
(when (or (= (batch-count *current-batch*) *copy-batch-rows*)
|
||||
oversized?)
|
||||
;; close current batch, prepare next one
|
||||
(with-slots (data count bytes) *current-batch*
|
||||
(lq:push-queue (list :batch data count oversized?) queue))
|
||||
(setf *current-batch* (make-batch))))
|
||||
|
||||
;; Add ROW to the current BATCH.
|
||||
;;
|
||||
@ -37,8 +47,14 @@
|
||||
;; formed COPY TEXT string ready to go in the PostgreSQL stream.
|
||||
(let ((copy-string (with-output-to-string (s)
|
||||
(format-vector-row s row (transforms copy)))))
|
||||
(with-slots (data count) *current-batch*
|
||||
(with-slots (data count bytes) *current-batch*
|
||||
(setf (aref data count) copy-string)
|
||||
(when *copy-batch-size* ; running under memory watch
|
||||
(incf bytes
|
||||
#+sbcl (length
|
||||
(sb-ext:string-to-octets copy-string :external-format :utf-8))
|
||||
#+ccl (ccl:string-size-in-octets copy-string :external-format :utf-8)
|
||||
#- (or sbcl ccl) (length copy-string)))
|
||||
(incf count))))
|
||||
|
||||
(defun map-push-queue (copy queue)
|
||||
@ -50,7 +66,7 @@
|
||||
(map-rows copy :process-row-fn (lambda (row) (batch-row row copy queue)))
|
||||
(with-slots (data count) *current-batch*
|
||||
(when (< 0 count)
|
||||
(lq:push-queue (list :batch data count) queue)))
|
||||
(lq:push-queue (list :batch data count nil) queue)))
|
||||
|
||||
;; signal we're done
|
||||
(lq:push-queue (list :end-of-data nil nil) queue)))
|
||||
(lq:push-queue (list :end-of-data nil nil nil) queue)))
|
||||
|
@ -313,16 +313,19 @@
|
||||
;;;
|
||||
(defun make-kernel (worker-count
|
||||
&key (bindings
|
||||
`((*monitoring-queue* . ,*monitoring-queue*)
|
||||
(*pgconn-host* . ,*pgconn-host*)
|
||||
(*pgconn-port* . ,*pgconn-port*)
|
||||
(*pgconn-user* . ,*pgconn-user*)
|
||||
(*pgconn-pass* . ,*pgconn-pass*)
|
||||
(*pg-settings* . ',*pg-settings*)
|
||||
(*myconn-host* . ,*myconn-host*)
|
||||
(*myconn-port* . ,*myconn-port*)
|
||||
(*myconn-user* . ,*myconn-user*)
|
||||
(*myconn-pass* . ,*myconn-pass*)
|
||||
(*state* . ,*state*))))
|
||||
`((*monitoring-queue* . ,*monitoring-queue*)
|
||||
(*copy-batch-rows* . ,*copy-batch-rows*)
|
||||
(*copy-batch-size* . ,*copy-batch-size*)
|
||||
(*concurrent-batches* . ,*concurrent-batches*)
|
||||
(*pgconn-host* . ,*pgconn-host*)
|
||||
(*pgconn-port* . ,*pgconn-port*)
|
||||
(*pgconn-user* . ,*pgconn-user*)
|
||||
(*pgconn-pass* . ,*pgconn-pass*)
|
||||
(*pg-settings* . ',*pg-settings*)
|
||||
(*myconn-host* . ,*myconn-host*)
|
||||
(*myconn-port* . ,*myconn-port*)
|
||||
(*myconn-user* . ,*myconn-user*)
|
||||
(*myconn-pass* . ,*myconn-pass*)
|
||||
(*state* . ,*state*))))
|
||||
"Wrapper around lparallel:make-kernel that sets our usual bindings."
|
||||
(lp:make-kernel worker-count :bindings bindings))
|
||||
|
63
test/data/exhausted.lisp
Normal file
63
test/data/exhausted.lisp
Normal file
@ -0,0 +1,63 @@
|
||||
;;; Test cases for issue https://github.com/dimitri/pgloader/issues/16
|
||||
;;;
|
||||
;;; Table already created as:
|
||||
|
||||
#|
|
||||
CREATE TABLE IF NOT EXISTS `document` (
|
||||
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
|
||||
`document_template_id` int(10) unsigned NOT NULL,
|
||||
`brand_id` int(10) unsigned DEFAULT NULL,
|
||||
`logo` char(1) NOT NULL DEFAULT '0',
|
||||
`footer` char(1) NOT NULL DEFAULT '0',
|
||||
`pages` char(1) NOT NULL DEFAULT '0',
|
||||
`content` longtext NOT NULL,
|
||||
`meta` text,
|
||||
`status` char(1) NOT NULL DEFAULT '1',
|
||||
`date_created` datetime NOT NULL,
|
||||
`user_id` varchar(128) NOT NULL,
|
||||
`region` varchar(32) DEFAULT NULL,
|
||||
`foreign_id` int(10) unsigned NOT NULL,
|
||||
`date_sent` datetime DEFAULT NULL,
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `region` (`region`,`foreign_id`) USING BTREE,
|
||||
KEY `document_template_id` (`document_template_id`) USING BTREE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
||||
|#
|
||||
|
||||
(defpackage #:pgloader.test.exhausted
|
||||
(:use #:cl #:pgloader.params #:pgloader.mysql)
|
||||
(:export #:produce-data))
|
||||
|
||||
(in-package #:pgloader.test.exhausted)
|
||||
|
||||
(defvar *string* (make-string 237563 :initial-element #\a)
|
||||
"A long string to reproduce heap exhaustion.")
|
||||
|
||||
(defun produce-data (&key
|
||||
(*myconn-host* *myconn-host*)
|
||||
(*myconn-port* *myconn-port*)
|
||||
(*myconn-user* *myconn-user*)
|
||||
(*myconn-pass* *myconn-pass*)
|
||||
(dbname "exhausted")
|
||||
(rows 5000))
|
||||
"Insert data to reproduce the test case."
|
||||
(with-mysql-connection (dbname)
|
||||
(loop repeat rows
|
||||
do (pgloader.mysql::mysql-query (format nil "
|
||||
INSERT INTO `document` (`document_template_id`,
|
||||
`brand_id`,
|
||||
`logo`,
|
||||
`footer`,
|
||||
`pages`,
|
||||
`content`,
|
||||
`meta`,
|
||||
`status`,
|
||||
`date_created`,
|
||||
`user_id`,
|
||||
`region`,
|
||||
`foreign_id`,
|
||||
`date_sent`)
|
||||
VALUES (20, 21, '0', '0', '0', '~a',
|
||||
'a:2:{s:7:\"comment\";s:0:\"\";s:4:\"date\";s:10:\"1372975200\";}',
|
||||
'1', '2013-06-21 13:04:46', 'cjumeaux', 'dossier', 104027,
|
||||
'2013-06-21 13:04:46');" *string*)))))
|
15
test/data/exhausted.load
Normal file
15
test/data/exhausted.load
Normal file
@ -0,0 +1,15 @@
|
||||
LOAD DATABASE
|
||||
FROM mysql://root@localhost:3306/exhausted
|
||||
INTO postgresql:///exhausted
|
||||
|
||||
WITH include drop, create tables, create indexes, reset sequences, truncate
|
||||
|
||||
CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null,
|
||||
type date drop not null drop default using zero-dates-to-null,
|
||||
type timestamp to timestamptz drop not null using zero-dates-to-null,
|
||||
|
||||
-- now the default for tinyint(1)
|
||||
-- column bools.a to boolean drop typemod using tinyint-to-boolean,
|
||||
|
||||
-- override char(1) to varchar(1), just use char(1) here.
|
||||
type char when (= precision 1) to char keep typemod;
|
Loading…
Reference in New Issue
Block a user