diff --git a/src/package.lisp b/src/package.lisp index a023ed9..770b301 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -115,6 +115,8 @@ (defpackage #:pgloader.queue (:use #:cl #:pgloader.params) + (:import-from #:pgloader.monitor + #:log-message) (:import-from #:pgloader.pgsql #:format-vector-row) (:import-from #:pgloader.sources diff --git a/src/params.lisp b/src/params.lisp index d7abee7..8954d98 100644 --- a/src/params.lisp +++ b/src/params.lisp @@ -12,7 +12,8 @@ #:*client-min-messages* #:*log-min-messages* #:*copy-batch-rows* - #:*concurrent-batches* + #:*copy-batch-size* + #:*concurrent-batches* #:*pgconn-host* #:*pgconn-port* #:*pgconn-user* @@ -57,10 +58,13 @@ ;;; How to split batches in case of data loading errors. ;;; (defparameter *copy-batch-rows* 25000 - "How many rows to batch per COPY transaction") + "How many rows to batch per COPY transaction.") + +(defparameter *copy-batch-size* nil ;; (* 20 1024 1024) + "Maximum memory size allowed for a single batch.") (defparameter *concurrent-batches* 10 - "How many batches do we stack in the queue in advance") + "How many batches do we stack in the queue in advance.") ;;; ;;; We need that to setup our default connection parameters diff --git a/src/pgsql/pgsql.lisp b/src/pgsql/pgsql.lisp index 44af36c..076cd64 100644 --- a/src/pgsql/pgsql.lisp +++ b/src/pgsql/pgsql.lisp @@ -49,11 +49,15 @@ (with-pgsql-connection (dbname) (loop - for (mesg batch read) = (lq:pop-queue queue) + for (mesg batch read oversized?) = (lq:pop-queue queue) until (eq mesg :end-of-data) for rows = (copy-batch table-name columns batch read) do (progn - (log-message :debug "copy-batch ~a ~d row~:p" table-name rows) + ;; The SBCL implementation needs some Garbage Collection + ;; decision making help... and now is a pretty good time. + #+sbcl (when oversized? (sb-ext:gc :full t)) + (log-message :debug "copy-batch ~a ~d row~:p~:[~; [oversized]~]" + table-name rows oversized?) (pgstate-incf *state* table-name :rows rows))))) ;;; diff --git a/src/queue.lisp b/src/queue.lisp index 2176518..ee75650 100644 --- a/src/queue.lisp +++ b/src/queue.lisp @@ -18,18 +18,28 @@ (defstruct batch (data (make-array *copy-batch-rows* :element-type 'simple-string) :type (vector simple-string *)) - (count 0 :type fixnum)) + (count 0 :type fixnum) + (bytes 0 :type fixnum)) (defvar *current-batch* nil) +(declaim (inline oversized?)) +(defun oversized? (&optional (batch *current-batch*)) + "Return a generalized boolean that is true only when BATCH is considered + over-sized when its size in BYTES is compared *copy-batch-size*." + (and *copy-batch-size* ; defaults to nil + (<= *copy-batch-size* (batch-bytes batch)))) + (defun batch-row (row copy queue) "Add ROW to the reader batch. When the batch is full, provide it to the writer as the *writer-batch*." - (when (= (batch-count *current-batch*) *copy-batch-rows*) - ;; close current batch, prepare next one - (with-slots (data count) *current-batch* - (lq:push-queue (list :batch data count) queue)) - (setf *current-batch* (make-batch))) + (let ((oversized? (oversized? *current-batch*))) + (when (or (= (batch-count *current-batch*) *copy-batch-rows*) + oversized?) + ;; close current batch, prepare next one + (with-slots (data count bytes) *current-batch* + (lq:push-queue (list :batch data count oversized?) queue)) + (setf *current-batch* (make-batch)))) ;; Add ROW to the current BATCH. ;; @@ -37,8 +47,14 @@ ;; formed COPY TEXT string ready to go in the PostgreSQL stream. (let ((copy-string (with-output-to-string (s) (format-vector-row s row (transforms copy))))) - (with-slots (data count) *current-batch* + (with-slots (data count bytes) *current-batch* (setf (aref data count) copy-string) + (when *copy-batch-size* ; running under memory watch + (incf bytes + #+sbcl (length + (sb-ext:string-to-octets copy-string :external-format :utf-8)) + #+ccl (ccl:string-size-in-octets copy-string :external-format :utf-8) + #- (or sbcl ccl) (length copy-string))) (incf count)))) (defun map-push-queue (copy queue) @@ -50,7 +66,7 @@ (map-rows copy :process-row-fn (lambda (row) (batch-row row copy queue))) (with-slots (data count) *current-batch* (when (< 0 count) - (lq:push-queue (list :batch data count) queue))) + (lq:push-queue (list :batch data count nil) queue))) ;; signal we're done - (lq:push-queue (list :end-of-data nil nil) queue))) + (lq:push-queue (list :end-of-data nil nil nil) queue))) diff --git a/src/utils.lisp b/src/utils.lisp index cf75aac..7f35ec1 100644 --- a/src/utils.lisp +++ b/src/utils.lisp @@ -313,16 +313,19 @@ ;;; (defun make-kernel (worker-count &key (bindings - `((*monitoring-queue* . ,*monitoring-queue*) - (*pgconn-host* . ,*pgconn-host*) - (*pgconn-port* . ,*pgconn-port*) - (*pgconn-user* . ,*pgconn-user*) - (*pgconn-pass* . ,*pgconn-pass*) - (*pg-settings* . ',*pg-settings*) - (*myconn-host* . ,*myconn-host*) - (*myconn-port* . ,*myconn-port*) - (*myconn-user* . ,*myconn-user*) - (*myconn-pass* . ,*myconn-pass*) - (*state* . ,*state*)))) + `((*monitoring-queue* . ,*monitoring-queue*) + (*copy-batch-rows* . ,*copy-batch-rows*) + (*copy-batch-size* . ,*copy-batch-size*) + (*concurrent-batches* . ,*concurrent-batches*) + (*pgconn-host* . ,*pgconn-host*) + (*pgconn-port* . ,*pgconn-port*) + (*pgconn-user* . ,*pgconn-user*) + (*pgconn-pass* . ,*pgconn-pass*) + (*pg-settings* . ',*pg-settings*) + (*myconn-host* . ,*myconn-host*) + (*myconn-port* . ,*myconn-port*) + (*myconn-user* . ,*myconn-user*) + (*myconn-pass* . ,*myconn-pass*) + (*state* . ,*state*)))) "Wrapper around lparallel:make-kernel that sets our usual bindings." (lp:make-kernel worker-count :bindings bindings)) diff --git a/test/data/exhausted.lisp b/test/data/exhausted.lisp new file mode 100644 index 0000000..92ba1df --- /dev/null +++ b/test/data/exhausted.lisp @@ -0,0 +1,63 @@ +;;; Test cases for issue https://github.com/dimitri/pgloader/issues/16 +;;; +;;; Table already created as: + +#| +CREATE TABLE IF NOT EXISTS `document` ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT, + `document_template_id` int(10) unsigned NOT NULL, + `brand_id` int(10) unsigned DEFAULT NULL, + `logo` char(1) NOT NULL DEFAULT '0', + `footer` char(1) NOT NULL DEFAULT '0', + `pages` char(1) NOT NULL DEFAULT '0', + `content` longtext NOT NULL, + `meta` text, + `status` char(1) NOT NULL DEFAULT '1', + `date_created` datetime NOT NULL, + `user_id` varchar(128) NOT NULL, + `region` varchar(32) DEFAULT NULL, + `foreign_id` int(10) unsigned NOT NULL, + `date_sent` datetime DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `region` (`region`,`foreign_id`) USING BTREE, + KEY `document_template_id` (`document_template_id`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +|# + +(defpackage #:pgloader.test.exhausted + (:use #:cl #:pgloader.params #:pgloader.mysql) + (:export #:produce-data)) + +(in-package #:pgloader.test.exhausted) + +(defvar *string* (make-string 237563 :initial-element #\a) + "A long string to reproduce heap exhaustion.") + +(defun produce-data (&key + (*myconn-host* *myconn-host*) + (*myconn-port* *myconn-port*) + (*myconn-user* *myconn-user*) + (*myconn-pass* *myconn-pass*) + (dbname "exhausted") + (rows 5000)) + "Insert data to reproduce the test case." + (with-mysql-connection (dbname) + (loop repeat rows + do (pgloader.mysql::mysql-query (format nil " +INSERT INTO `document` (`document_template_id`, + `brand_id`, + `logo`, + `footer`, + `pages`, + `content`, + `meta`, + `status`, + `date_created`, + `user_id`, + `region`, + `foreign_id`, + `date_sent`) + VALUES (20, 21, '0', '0', '0', '~a', + 'a:2:{s:7:\"comment\";s:0:\"\";s:4:\"date\";s:10:\"1372975200\";}', + '1', '2013-06-21 13:04:46', 'cjumeaux', 'dossier', 104027, + '2013-06-21 13:04:46');" *string*))))) diff --git a/test/data/exhausted.load b/test/data/exhausted.load new file mode 100644 index 0000000..cda9f94 --- /dev/null +++ b/test/data/exhausted.load @@ -0,0 +1,15 @@ +LOAD DATABASE + FROM mysql://root@localhost:3306/exhausted + INTO postgresql:///exhausted + + WITH include drop, create tables, create indexes, reset sequences, truncate + + CAST type datetime to timestamptz drop default drop not null using zero-dates-to-null, + type date drop not null drop default using zero-dates-to-null, + type timestamp to timestamptz drop not null using zero-dates-to-null, + + -- now the default for tinyint(1) + -- column bools.a to boolean drop typemod using tinyint-to-boolean, + + -- override char(1) to varchar(1), just use char(1) here. + type char when (= precision 1) to char keep typemod;