From a8b0f91f377e2f98bcfac27e6fbe5e533bac319f Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sun, 26 Jan 2014 23:22:07 +0100 Subject: [PATCH] Allow optional control of batch memory footprint, see #16 and #22. With the new internal setting *copy-batch-size* it's now possible to instruct pgloader to close batches early (before *copy-batch-rows* limit) when crossing the byte count threshold. When set to 20 MB it allows the new test case (exhausted) to pass under SBCL and CCL, and there's no measurable cost when *copy-batch-size* is set to nil (its default value) in the testing done. This patch is published without any way to tune the values from the command language yet, that's the next step once its been proven effective. --- src/package.lisp | 2 ++ src/params.lisp | 10 +++++-- src/pgsql/pgsql.lisp | 8 +++-- src/queue.lisp | 34 ++++++++++++++++------ src/utils.lisp | 25 +++++++++------- test/data/exhausted.lisp | 63 ++++++++++++++++++++++++++++++++++++++++ test/data/exhausted.load | 15 ++++++++++ 7 files changed, 132 insertions(+), 25 deletions(-) create mode 100644 test/data/exhausted.lisp create mode 100644 test/data/exhausted.load diff --git a/src/package.lisp b/src/package.lisp index a023ed9..770b301 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -115,6 +115,8 @@ (defpackage #:pgloader.queue (:use #:cl #:pgloader.params) + (:import-from #:pgloader.monitor + #:log-message) (:import-from #:pgloader.pgsql #:format-vector-row) (:import-from #:pgloader.sources diff --git a/src/params.lisp b/src/params.lisp index d7abee7..8954d98 100644 --- a/src/params.lisp +++ b/src/params.lisp @@ -12,7 +12,8 @@ #:*client-min-messages* #:*log-min-messages* #:*copy-batch-rows* - #:*concurrent-batches* + #:*copy-batch-size* + #:*concurrent-batches* #:*pgconn-host* #:*pgconn-port* #:*pgconn-user* @@ -57,10 +58,13 @@ ;;; How to split batches in case of data loading errors. ;;; (defparameter *copy-batch-rows* 25000 - "How many rows to batch per COPY transaction") + "How many rows to batch per COPY transaction.") + +(defparameter *copy-batch-size* nil ;; (* 20 1024 1024) + "Maximum memory size allowed for a single batch.") (defparameter *concurrent-batches* 10 - "How many batches do we stack in the queue in advance") + "How many batches do we stack in the queue in advance.") ;;; ;;; We need that to setup our default connection parameters diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 44af36c..076cd64 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -49,11 +49,15 @@ (with-pgsql-connection (dbname) (loop - for (mesg batch read) = (lq:pop-queue queue) + for (mesg batch read oversized?) = (lq:pop-queue queue) until (eq mesg :end-of-data) for rows = (copy-batch table-name columns batch read) do (progn - (log-message :debug "copy-batch ~a ~d row~:p" table-name rows) + ;; The SBCL implementation needs some Garbage Collection + ;; decision making help... and now is a pretty good time. + #+sbcl (when oversized? (sb-ext:gc :full t)) + (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" + table-name rows oversized?) (pgstate-incf *state* table-name :rows rows))))) ;;; diff --git a/src/queue.lisp b/src/queue.lisp index 2176518..ee75650 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -18,18 +18,28 @@ (defstruct batch (data (make-array *copy-batch-rows* :element-type 'simple-string) :type (vector simple-string *)) - (count 0 :type fixnum)) + (count 0 :type fixnum) + (bytes 0 :type fixnum)) (defvar *current-batch* nil) +(declaim (inline oversized?)) +(defun oversized? (&optional (batch *current-batch*)) + "Return a generalized boolean that is true only when BATCH is considered + over-sized when its size in BYTES is compared *copy-batch-size*." + (and *copy-batch-size* ; defaults to nil + (<= *copy-batch-size* (batch-bytes batch)))) + (defun batch-row (row copy queue) "Add ROW to the reader batch. When the batch is full, provide it to the writer as the *writer-batch*." - (when (= (batch-count *current-batch*) *copy-batch-rows*) - ;; close current batch, prepare next one - (with-slots (data count) *current-batch* - (lq:push-queue (list :batch data count) queue)) - (setf *current-batch* (make-batch))) + (let ((oversized? (oversized? *current-batch*))) + (when (or (= (batch-count *current-batch*) *copy-batch-rows*) + oversized?) + ;; close current batch, prepare next one + (with-slots (data count bytes) *current-batch* + (lq:push-queue (list :batch data count oversized?) queue)) + (setf *current-batch* (make-batch)))) ;; Add ROW to the current BATCH. ;; @@ -37,8 +47,14 @@ ;; formed COPY TEXT string ready to go in the PostgreSQL stream. (let ((copy-string (with-output-to-string (s) (format-vector-row s row (transforms copy))))) - (with-slots (data count) *current-batch* + (with-slots (data count bytes) *current-batch* (setf (aref data count) copy-string) + (when *copy-batch-size* ; running under memory watch + (incf bytes + #+sbcl (length + (sb-ext:string-to-octets copy-string :external-format :utf-8)) + #+ccl (ccl:string-size-in-octets copy-string :external-format :utf-8) + #- (or sbcl ccl) (length copy-string))) (incf count)))) (defun map-push-queue (copy queue) @@ -50,7 +66,7 @@ (map-rows copy :process-row-fn (lambda (row) (batch-row row copy queue))) (with-slots (data count) *current-batch* (when (< 0 count) - (lq:push-queue (list :batch data count) queue))) + (lq:push-queue (list :batch data count nil) queue))) ;; signal we're done - (lq:push-queue (list :end-of-data nil nil) queue))) + (lq:push-queue (list :end-of-data nil nil nil) queue))) diff --git a/src/utils.lisp b/src/utils.lisp index cf75aac..7f35ec1 100644 --- a/src/utils.lisp +++ b/src/utils.lisp @@ -313,16 +313,19 @@ ;;; (defun make-kernel (worker-count &key (bindings - `((*monitoring-queue* . ,*monitoring-queue*) - (*pgconn-host* . ,*pgconn-host*) - (*pgconn-port* . ,*pgconn-port*) - (*pgconn-user* . ,*pgconn-user*) - (*pgconn-pass* . ,*pgconn-pass*) - (*pg-settings* . ',*pg-settings*) - (*myconn-host* . ,*myconn-host*) - (*myconn-port* . ,*myconn-port*) - (*myconn-user* . ,*myconn-user*) - (*myconn-pass* . ,*myconn-pass*) - (*state* . ,*state*)))) + `((*monitoring-queue* . ,*monitoring-queue*) + (*copy-batch-rows* . ,*copy-batch-rows*) + (*copy-batch-size* . ,*copy-batch-size*) + (*concurrent-batches* . ,*concurrent-batches*) + (*pgconn-host* . ,*pgconn-host*) + (*pgconn-port* . ,*pgconn-port*) + (*pgconn-user* . ,*pgconn-user*) + (*pgconn-pass* . ,*pgconn-pass*) + (*pg-settings* . ',*pg-settings*) + (*myconn-host* . ,*myconn-host*) + (*myconn-port* . ,*myconn-port*) + (*myconn-user* . ,*myconn-user*) + (*myconn-pass* . ,*myconn-pass*) + (*state* . ,*state*)))) "Wrapper around lparallel:make-kernel that sets our usual bindings." (lp:make-kernel worker-count :bindings bindings)) diff --git a/test/data/exhausted.lisp b/test/data/exhausted.lisp new file mode 100644 index 0000000..92ba1df --- /dev/null +++ b/test/data/exhausted.lisp @@ -0,0 +1,63 @@ +;;; Test cases for issue https://github.com/dimitri/pgloader/issues/16 +;;; +;;; Table already created as: + +#| +CREATE TABLE IF NOT EXISTS `document` ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT, + `document_template_id` int(10) unsigned NOT NULL, + `brand_id` int(10) unsigned DEFAULT NULL, + `logo` char(1) NOT NULL DEFAULT '0', + `footer` char(1) NOT NULL DEFAULT '0', + `pages` char(1) NOT NULL DEFAULT '0', + `content` longtext NOT NULL, + `meta` text, + `status` char(1) NOT NULL DEFAULT '1', + `date_created` datetime NOT NULL, + `user_id` varchar(128) NOT NULL, + `region` varchar(32) DEFAULT NULL, + `foreign_id` int(10) unsigned NOT NULL, + `date_sent` datetime DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `region` (`region`,`foreign_id`) USING BTREE, + KEY `document_template_id` (`document_template_id`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +|# + +(defpackage #:pgloader.test.exhausted + (:use #:cl #:pgloader.params #:pgloader.mysql) + (:export #:produce-data)) + +(in-package #:pgloader.test.exhausted) + +(defvar *string* (make-string 237563 :initial-element #\a) + "A long string to reproduce heap exhaustion.") + +(defun produce-data (&key + (*myconn-host* *myconn-host*) + (*myconn-port* *myconn-port*) + (*myconn-user* *myconn-user*) + (*myconn-pass* *myconn-pass*) + (dbname "exhausted") + (rows 5000)) + "Insert data to reproduce the test case." + (with-mysql-connection (dbname) + (loop repeat rows + do (pgloader.mysql::mysql-query (format nil " +INSERT INTO `document` (`document_template_id`, + `brand_id`, + `logo`, + `footer`, + `pages`, + `content`, + `meta`, + `status`, + `date_created`, + `user_id`, + `region`, + `foreign_id`, + `date_sent`) + VALUES (20, 21, '0', '0', '0', '~a', + 'a:2:{s:7:\"comment\";s:0:\"\";s:4:\"date\";s:10:\"1372975200\";}', + '1', '2013-06-21 13:04:46', 'cjumeaux', 'dossier', 104027, + '2013-06-21 13:04:46');" *string*))))) diff --git a/test/data/exhausted.load b/test/data/exhausted.load new file mode 100644 index 0000000..cda9f94 --- /dev/null +++ b/test/data/exhausted.load @@ -0,0 +1,15 @@ +LOAD DATABASE + FROM mysql://root@localhost:3306/exhausted + INTO postgresql:///exhausted + + WITH include drop, create tables, create indexes, reset sequences, truncate + + CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null, + type date drop not null drop default using zero-dates-to-null, + type timestamp to timestamptz drop not null using zero-dates-to-null, + + -- now the default for tinyint(1) + -- column bools.a to boolean drop typemod using tinyint-to-boolean, + + -- override char(1) to varchar(1), just use char(1) here. + type char when (= precision 1) to char keep typemod;