mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-08 23:37:00 +02:00
Add some preliminary logging capacity
This commit is contained in:
parent
c6c3f11ffe
commit
72c783f02f
16
package.lisp
16
package.lisp
@ -5,10 +5,21 @@
|
||||
|
||||
(defpackage #:pgloader.utils
|
||||
(:use #:cl)
|
||||
(:import-from #:cl-log
|
||||
#:defcategory
|
||||
#:log-manager
|
||||
#:start-messenger
|
||||
#:log-message
|
||||
#:ring-messenger
|
||||
#:text-file-messenger
|
||||
#:formatted-message)
|
||||
(:import-from #:pgloader.params
|
||||
#:*reject-path-root*
|
||||
#:*log-filename*
|
||||
#:*log-level*
|
||||
#:*state*)
|
||||
(:export #:report-header
|
||||
(:export #:log-message
|
||||
#:report-header
|
||||
#:report-table-name
|
||||
#:report-results
|
||||
#:report-footer
|
||||
@ -37,6 +48,7 @@
|
||||
#:*loader-kernel*
|
||||
#:*state*)
|
||||
(:import-from #:pgloader.utils
|
||||
#:log-message
|
||||
#:report-header
|
||||
#:report-table-name
|
||||
#:report-results
|
||||
@ -70,6 +82,7 @@
|
||||
#:*myconn-pass*
|
||||
#:*state*)
|
||||
(:import-from #:pgloader.utils
|
||||
#:log-message
|
||||
#:report-header
|
||||
#:report-table-name
|
||||
#:report-results
|
||||
@ -101,6 +114,7 @@
|
||||
#:*loader-kernel*
|
||||
#:*state*)
|
||||
(:import-from #:pgloader.utils
|
||||
#:log-message
|
||||
#:report-header
|
||||
#:report-table-name
|
||||
#:report-results
|
||||
|
@ -7,6 +7,7 @@
|
||||
(:use #:cl)
|
||||
(:export #:*csv-path-root*
|
||||
#:*reject-path-root*
|
||||
#:*log-filename*
|
||||
#:*loader-kernel*
|
||||
#:*myconn-host*
|
||||
#:*myconn-user*
|
||||
@ -26,6 +27,12 @@
|
||||
(defparameter *reject-path-root*
|
||||
(make-pathname :directory "/tmp/pgloader/"))
|
||||
|
||||
(defparameter *log-filename*
|
||||
(make-pathname :directory "/tmp/pgloader/" :name "pgloader" :type "log"))
|
||||
|
||||
(defparameter *log-level* :notice)
|
||||
(setq *log-level* :debug)
|
||||
|
||||
;;; package nicknames are only defined later, in package.lisp
|
||||
(defparameter *loader-kernel* (lparallel:make-kernel 2)
|
||||
"lparallel kernel to use for loading data in parallel")
|
||||
|
@ -5,7 +5,8 @@
|
||||
:description "Load data into PostgreSQL"
|
||||
:author "Dimitri Fontaine <dimitri@2ndQuadrant.fr>"
|
||||
:license "The PostgreSQL Licence"
|
||||
:depends-on (#:postmodern
|
||||
:depends-on (#:cl-log
|
||||
#:postmodern
|
||||
#:cl-postgres
|
||||
#:simple-date
|
||||
#:cl-mysql
|
||||
|
10
pgsql.lisp
10
pgsql.lisp
@ -281,12 +281,15 @@ Finally returns how many rows where read and processed."
|
||||
"Fetch data from the QUEUE until we see :end-of-data. Update *state*"
|
||||
(when truncate (truncate-table dbname table-name))
|
||||
|
||||
(log-message :debug "pgsql:copy-from-queue: ~a ~a" dbname table-name)
|
||||
|
||||
(let* ((conspec (remove :port (get-connection-string dbname))))
|
||||
(loop
|
||||
for retval =
|
||||
(let* ((stream (cl-postgres:open-db-writer conspec table-name nil))
|
||||
(*batch* nil)
|
||||
(*batch-size* 0))
|
||||
(log-message :debug "pgsql:copy-from-queue: starting new batch")
|
||||
(unwind-protect
|
||||
(let ((process-row-fn
|
||||
(make-copy-and-batch-fn stream :date-columns date-columns)))
|
||||
@ -295,6 +298,7 @@ Finally returns how many rows where read and processed."
|
||||
;; in case of data-exception, split the batch and try again
|
||||
(handler-case
|
||||
(progn
|
||||
(log-message :debug "pgsql:copy-from-queue: commit batch")
|
||||
(cl-postgres:close-db-writer stream))
|
||||
((or
|
||||
CL-POSTGRES-ERROR:UNIQUE-VIOLATION
|
||||
@ -328,6 +332,8 @@ Finally returns how many rows where read and processed."
|
||||
(dataq (lq:make-queue 4096))
|
||||
(*state* (if report (pgloader.utils:make-pgstate) *state*)))
|
||||
|
||||
(log-message :debug "pgsql:copy-from-file: ~a ~a ~a" dbname table-name filename)
|
||||
|
||||
(when report
|
||||
(pgstate-add-table *state* dbname table-name))
|
||||
|
||||
@ -422,12 +428,16 @@ Finally returns how many rows where read and processed."
|
||||
(loop
|
||||
while (< processed-rows batch-size)
|
||||
do
|
||||
(log-message :debug "pgsql:retry-batch: splitting current batch")
|
||||
(let* ((current-batch current-batch-pos)
|
||||
(current-batch-size (smaller-batch-size batch-size
|
||||
processed-rows))
|
||||
(stream
|
||||
(cl-postgres:open-db-writer conspec table-name nil)))
|
||||
|
||||
(log-message :debug "pgsql:retry-batch: current-batch-size = ~d"
|
||||
current-batch-size)
|
||||
|
||||
(unwind-protect
|
||||
(dotimes (i current-batch-size)
|
||||
;; rows in that batch have already been processed
|
||||
|
21
utils.lisp
21
utils.lisp
@ -3,6 +3,27 @@
|
||||
;;;
|
||||
(in-package :pgloader.utils)
|
||||
|
||||
;;;
|
||||
;;; Logs
|
||||
;;;
|
||||
;;; First define the log categories
|
||||
(defcategory :critical)
|
||||
(defcategory :error (or :error :critical))
|
||||
(defcategory :warning (or :warning :error))
|
||||
(defcategory :notice (or :notice :warning))
|
||||
(defcategory :info (or :info :notice))
|
||||
(defcategory :debug (or :debug :info))
|
||||
|
||||
;; Now define the Logger
|
||||
(setf (log-manager)
|
||||
(make-instance 'log-manager :message-class 'formatted-message))
|
||||
|
||||
;; And a messenger to store our message into
|
||||
(start-messenger 'text-file-messenger :filename *log-filename*)
|
||||
|
||||
;; Announce what just happened
|
||||
(log-message :notice "Starting pgloader, log system is ready.")
|
||||
|
||||
;;;
|
||||
;;; Timing Macro
|
||||
;;;
|
||||
|
Loading…
Reference in New Issue
Block a user