First attempt at a multi-threaded pgloader, seems to be ok though not configurable yet

This commit is contained in:
dim 2008-02-05 13:41:45 +00:00
parent 526836deca
commit 9e17976ef8
2 changed files with 55 additions and 14 deletions

View File

@ -451,16 +451,25 @@ def load_data():
# we run through sorted section list
sections.sort()
threads = {}
running = 0
for s in sections:
summary[s] = []
loader = PGLoader(s, config, summary[s])
try:
loader = PGLoader(s, config)
if not loader.template:
loader.run()
summary[s] = (loader.table,) + loader.summary()
filename = loader.filename
input_encoding = loader.input_encoding
threads[s] = loader
log.info("Starting thread %d for %s" % (running, s))
threads[s].start()
running += 1
else:
log.info("Skipping section %s, which is a template" \
% s)
log.info("Skipping section %s, which is a template" % s)
summary.pop(s)
except PGLoader_Error, e:
if e == '':
@ -469,15 +478,33 @@ def load_data():
log.error('%s' % e)
if PEDANTIC:
pgloader.print_stats()
# was: threads[s].print_stats()
# but now thread[s] is no more alive
pass
except UnicodeDecodeError, e:
log.error("can't open '%s' with given input encoding '%s'" \
% (loader.filename, loader.input_encoding))
% (filename, input_encoding))
except KeyboardInterrupt:
log.warning("Aborting on user demand (Interrupt)")
while running > 0:
for s in threads:
if not threads[s].isAlive():
running -= 1
if running > 0:
log.info("%d thread(s) still running" % running)
try:
log.info('waiting for %d threads, sleeping %gs' % (running, .1))
time.sleep(.1)
except KeyboardInterrupt:
log.warning("Aborting %d threads still running at user demand"\
% running)
break
# total duration
td = time.time() - begin
retcode = 0

View File

@ -5,7 +5,7 @@
# handles configuration, parse data, then pass them to database module for
# COPY preparation
import os, sys, os.path, time, codecs
import os, sys, os.path, time, codecs, threading
from cStringIO import StringIO
from logger import log, getLogger
@ -22,17 +22,20 @@ from options import NEWLINE_ESCAPES
from options import UDC_PREFIX
from options import REFORMAT_PATH
class PGLoader:
class PGLoader(threading.Thread):
"""
PGLoader reads some data file and depending on ts configuration,
import data with COPY or update blob data with UPDATE.
"""
def __init__(self, name, config):
def __init__(self, name, config, stats):
""" Init with a configuration section """
threading.Thread.__init__(self, name = name)
# Some settings
self.name = name
self.log = getLogger(name)
self.stats = stats
self.name = name
self.log = getLogger(name)
self.__dbconnect__(config)
@ -635,7 +638,6 @@ class PGLoader:
def print_stats(self):
""" print out some statistics """
if self.reject is not None:
self.errors = self.reject.errors
self.reject.print_stats(self.name)
@ -672,6 +674,18 @@ class PGLoader:
# then show up some stats
self.print_stats()
# update the main summary
self.duration = time.time() - self.init_time
if self.reject is not None:
self.errors = self.reject.errors
for x in [self.table, self.duration, self.db.commited_rows, self.errors]:
self.stats.append(x)
self.log.info("loading done")
return
def data_import(self):
""" import CSV or TEXT data, using COPY """