From 9e17976ef8f7e6a77d8820ff6a559a06f7cb4dd6 Mon Sep 17 00:00:00 2001 From: dim Date: Tue, 5 Feb 2008 13:41:45 +0000 Subject: [PATCH] First attempt at a multi-threaded pgloader, seems to be ok though not configurable yet --- pgloader.py | 43 +++++++++++++++++++++++++++++++++++-------- pgloader/pgloader.py | 26 ++++++++++++++++++++------ 2 files changed, 55 insertions(+), 14 deletions(-) diff --git a/pgloader.py b/pgloader.py index 820a7b8..dc518c1 100644 --- a/pgloader.py +++ b/pgloader.py @@ -451,16 +451,25 @@ def load_data(): # we run through sorted section list sections.sort() + + threads = {} + running = 0 for s in sections: + summary[s] = [] + loader = PGLoader(s, config, summary[s]) try: - loader = PGLoader(s, config) - if not loader.template: - loader.run() - summary[s] = (loader.table,) + loader.summary() + filename = loader.filename + input_encoding = loader.input_encoding + + threads[s] = loader + + log.info("Starting thread %d for %s" % (running, s)) + threads[s].start() + running += 1 else: - log.info("Skipping section %s, which is a template" \ - % s) + log.info("Skipping section %s, which is a template" % s) + summary.pop(s) except PGLoader_Error, e: if e == '': @@ -469,15 +478,33 @@ def load_data(): log.error('%s' % e) if PEDANTIC: - pgloader.print_stats() + # was: threads[s].print_stats() + # but now thread[s] is no more alive + pass except UnicodeDecodeError, e: log.error("can't open '%s' with given input encoding '%s'" \ - % (loader.filename, loader.input_encoding)) + % (filename, input_encoding)) except KeyboardInterrupt: log.warning("Aborting on user demand (Interrupt)") + while running > 0: + for s in threads: + if not threads[s].isAlive(): + running -= 1 + + if running > 0: + log.info("%d thread(s) still running" % running) + + try: + log.info('waiting for %d threads, sleeping %gs' % (running, .1)) + time.sleep(.1) + except KeyboardInterrupt: + log.warning("Aborting %d threads still running at user demand"\ + % running) + break + # total duration td = time.time() - begin retcode = 0 diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index f4b9295..7282737 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -5,7 +5,7 @@ # handles configuration, parse data, then pass them to database module for # COPY preparation -import os, sys, os.path, time, codecs +import os, sys, os.path, time, codecs, threading from cStringIO import StringIO from logger import log, getLogger @@ -22,17 +22,20 @@ from options import NEWLINE_ESCAPES from options import UDC_PREFIX from options import REFORMAT_PATH -class PGLoader: +class PGLoader(threading.Thread): """ PGLoader reads some data file and depending on ts configuration, import data with COPY or update blob data with UPDATE. """ - def __init__(self, name, config): + def __init__(self, name, config, stats): """ Init with a configuration section """ + threading.Thread.__init__(self, name = name) + # Some settings - self.name = name - self.log = getLogger(name) + self.stats = stats + self.name = name + self.log = getLogger(name) self.__dbconnect__(config) @@ -635,7 +638,6 @@ class PGLoader: def print_stats(self): """ print out some statistics """ - if self.reject is not None: self.errors = self.reject.errors self.reject.print_stats(self.name) @@ -672,6 +674,18 @@ class PGLoader: # then show up some stats self.print_stats() + # update the main summary + self.duration = time.time() - self.init_time + + if self.reject is not None: + self.errors = self.reject.errors + + for x in [self.table, self.duration, self.db.commited_rows, self.errors]: + self.stats.append(x) + + self.log.info("loading done") + return + def data_import(self): """ import CSV or TEXT data, using COPY """