FIX just-introduced bug: no reader init meant self.db.copy_sep = None

This commit is contained in:
dim 2008-02-14 14:57:47 +00:00
parent 41f1a40e49
commit a068a5ea6a
5 changed files with 136 additions and 81 deletions

View File

@ -21,7 +21,7 @@ empty_string = "\ "
max_parallel_sections = 1
[parallel_template]
[ptmpl]
template = True
table = parallel
filename = parallel/parallel.data
@ -30,14 +30,14 @@ columns = a, b
[split]
format = csv
use_template = parallel_template
use_template = ptmpl
section_threads = 2
split_file_reading = True
; round robin reader (1 reader feed section_threads processing threads)
[rrr]
format = csv
use_template = parallel_template
use_template = ptmpl
section_threads = 2
split_file_reading = False
rrqueue_size = -1

View File

@ -465,6 +465,7 @@ def load_data():
threads = {}
current = 0
interrupted = False
got_errors = False
max_running = MAX_PARALLEL_SECTIONS
if max_running == -1:
@ -492,6 +493,8 @@ def load_data():
summary.pop(s)
except PGLoader_Error, e:
got_errors = True
if e == '':
log.error('[%s] Please correct previous errors' % s)
else:
@ -503,6 +506,7 @@ def load_data():
pass
except UnicodeDecodeError, e:
got_errors = True
log.error("can't open '%s' with given input encoding '%s'" \
% (filename, input_encoding))
@ -530,7 +534,7 @@ def load_data():
td = time.time() - begin
retcode = 0
if SUMMARY and not interrupted:
if SUMMARY and not interrupted and not got_errors:
try:
retcode = print_summary(None, sections, summary, td)
print

View File

@ -514,7 +514,13 @@ class db:
('\r', '\\r'),
('\t', '\\t'),
('\v', '\\v')]:
c = c.replace(orig, escaped)
try:
c = c.replace(orig, escaped)
except TypeError, e:
self.log.error("db.prepare_copy_data columns %s" % str(columns))
self.log.error("db.prepare_copy_data input_line %s" % str(input_line))
self.log.error("TypeError: '%s'.replace(%s, %s)" % (c, orig, escaped))
raise PGLoader_Error, e
self.buffer.write(c)

View File

@ -33,34 +33,39 @@ class PGLoader(threading.Thread):
def __init__(self, name, config, sem, stats,
logname = None,
reject = None, queue = None, lock = None):
reject = None, queue = None, lock = None, copy_sep = None):
""" Init with a configuration section """
threading.Thread.__init__(self, name = name)
# logname is given when we use several Threads for reading
# input file
self.logname = logname
if self.logname is None:
self.logname = name
self.log = getLogger(self.logname)
# sem, stats and queue (if not None) are global objects:
# sem is shared by all threads at the same level
# stats is a private entry of a shared dict
# queue is given when reading in round robin mode
# lock is a threading.Lock for reading sync
# reject is a common reject object, protected by a parent-thread rlock
threading.Thread.__init__(self, name = self.logname)
# sem, stats and queue (if not None) are global objects
# sem is shared by all threads at the same level
# stats is a private entry of a shared dict
# queue is given when reading in round robin mode
# lock is a threading.Lock for reading sync
# reject is a common reject object, protected by a parent-thread rlock
#
self.sem = sem
self.stats = stats
self.queue = queue
self.lock = lock
self.reject = reject
self.sem = sem
self.stats = stats
self.queue = queue
self.lock = lock
self.reject = reject
# thereafter parameters are local
self.name = name
self.config = config
# logname is given when we use several Threads for reading
# input file
if logname is None:
logname = name
self.log = getLogger(logname)
self._dbconnect(config)
# in the round-robin reader case, workers won't have
# initialized a reader, thus won't read the configuration of
# copy_sep.
self.copy_sep = copy_sep
self.template = None
self.use_template = None
@ -90,44 +95,54 @@ class PGLoader(threading.Thread):
# check if the section wants to use a template
if config.has_option(name, 'use_template'):
self.tsection = config.get(name, 'use_template')
self.template = config.get(name, 'use_template')
if not config.has_section(self.template):
m = '%s refers to unknown template section %s' \
% (name, self.template)
raise PGLoader_Error, m
self._dbconnect(config)
# first load template configuration
self.log.info("Reading configuration from template " +\
"section [%s]", self.template)
if self.tsection is not None:
self.template = config.get(name, 'use_template')
self.real_log = self.log
self.log = getLogger(self.template)
if not config.has_section(self.template):
m = '%s refers to unknown template section %s' \
% (name, self.template)
try:
self._read_conf(self.template, config, db,
want_template = True)
except PGLoader_Error, e:
self.log.error(e)
m = "%s.use_template does not refer to a template section"\
% name
raise PGLoader_Error, m
raise PGLoader_Error, m
# reinit self.template now its relative config section is read
self.template = None
self.log = self.real_log
# first load template configuration
self.log.info("Reading configuration from template " +\
"section [%s]", self.template)
self.real_log = self.log
self.log = getLogger(self.template)
try:
self._read_conf(self.template, config, db,
want_template = True)
except PGLoader_Error, e:
self.log.error(e)
m = "%s.use_template does not refer to a template section"\
% name
raise PGLoader_Error, m
# reinit self.template now its relative config section is read
self.template = None
self.log = self.real_log
if not self.template:
# now load specific configuration
self.log.info("Reading configuration from section [%s]", name)
self.log.info("Reading configuration from section [%s]", name)
self._read_conf(name, config, db)
# Now reset database connection
if not DRY_RUN:
self.db.log = self.log
self.db.log = self.log
self.db.reset()
# check we have properly configured the copy separator
if self.db.copy_sep is None:
self.log.debug("%s" % self.db)
msg = "BUG: pgloader couldn't configure its COPY separator"
raise PGLoader_Error, msg
self.log.debug('%s init done' % name)
def _dbconnect(self, config):
@ -166,8 +181,22 @@ class PGLoader(threading.Thread):
self.db.commit_every = config.getint(
section, 'commit_every')
if self.copy_sep is not None:
self.db.copy_sep = self.copy_sep
self.log.debug("got copy_sep '%s' from self.copy_sep" \
% self.copy_sep)
if config.has_option(section, 'copy_delimiter'):
self.db.copy_sep = config.get(section, 'copy_delimiter')
self.log.debug("got copy_sep '%s' from %s" \
% (self.db.copy_sep, section))
elif config.has_option(self.tsection, 'copy_delimiter'):
self.db.copy_sep = config.get(section, 'copy_delimiter')
self.log.debug("got copy_sep '%s' from self.copy_sep" \
% (self.db.copy_sep, self.tsection))
self.log.debug("_dbconnect copy_sep %s " % self.db.copy_sep)
except Exception, error:
log.error("Could not initialize PostgreSQL connection")
@ -508,11 +537,14 @@ class PGLoader(threading.Thread):
if config.has_option(name, 'format'):
self.format = config.get(name, 'format')
if not self.template:
if not self.template and self.queue is None:
# Only init self.reader in real section, not from
# template. self.reader.readconfig() will care about
# reading its configuration from template and current
# section.
#
# If self.queue is not None, we'll read data from it and
# not from self.reader, so we don't care about it.
if 'format' not in self.__dict__:
raise PGLoader_Error, "Please configure %s.format" % name
@ -535,7 +567,7 @@ class PGLoader(threading.Thread):
self.table, self.columns,
self.newline_escapes)
self.log.debug('reader.readconfig()')
self.log.info('reader.readconfig()')
self.reader.readconfig(config, name, self.tsection)
@ -740,18 +772,22 @@ class PGLoader(threading.Thread):
self.sem.acquire()
# Announce the beginning of the work
self.log.info("%s launched" % self.name)
self.log.info("%s launched" % self.logname)
if self.section_threads == 1:
if self.reader.start is not None:
if 'reader' in self.__dict__ and self.reader.start is not None:
self.log.info("Loading from offset %d to %d" \
% (self.reader.start, self.reader.end))
self.prepare_processing()
self.process()
self.finish_processing()
try:
# catch worker exception
self.prepare_processing()
self.process()
self.finish_processing()
except Exception, e:
self.log.error(e)
self.log.info("Releasing %s" % self.name)
self.log.info("Releasing %s" % self.logname)
self.sem.release()
return
@ -771,8 +807,22 @@ class PGLoader(threading.Thread):
# here we need a special thread reading the file
self.round_robin_read()
self.log.info("releasing %s" % self.logname)
self.sem.release()
self.log.info("%s released" % self.name)
return
def wait_for_workers(self, sem, workers):
"""
Try to acquire all semaphore entries --- success means no
more thread is running
"""
from tools import running_threads
for i in range(len(workers)):
sem.acquire()
self.log.debug("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(workers))
return
def split_file_read(self):
@ -843,18 +893,10 @@ class PGLoader(threading.Thread):
# be started
time.sleep(2)
from tools import running_threads
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
# Try to acquire all semaphore entries --- success means no
# more thread is running
for i in range(self.section_threads):
sem.acquire()
self.log.info("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.wait_for_workers(sem, threads)
self.finish_processing()
self.duration = time.time() - self.init_time
self.log.info('No more threads are running, %s done' % self.name)
@ -897,10 +939,11 @@ class PGLoader(threading.Thread):
loader = PGLoader(self.name, self.config, sem,
summary[current],
logname = current_name,
reject = self.reject,
queue = queues[current],
lock = locks [current])
logname = current_name,
reject = self.reject,
queue = queues[current],
lock = locks [current],
copy_sep = self.db.copy_sep)
loader.section_threads = 1
loader.dont_prepare_nor_finish = True
@ -910,7 +953,13 @@ class PGLoader(threading.Thread):
threads[current_name].start()
except Exception, e:
raise
self.log.error("Couldn't start all workers thread")
self.log.error(e)
if len(threads) != self.section_threads:
self.log.error("Couldn't start all threads, check previous errors")
self.wait_for_workers(sem, threads)
return
# wait for loaders completion, first let them some time to
# be started
@ -975,14 +1024,7 @@ class PGLoader(threading.Thread):
n = running_threads(threads)
self.log.info("Waiting for %d threads to terminate" % n)
# Try to acquire all semaphore entries --- success means no
# more thread is running
for i in range(self.section_threads):
sem.acquire()
self.log.debug("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.wait_for_workers(sem, threads)
self.finish_processing()
self.duration = time.time() - self.init_time
self.log.info('%s done' % self.name)

View File

@ -65,6 +65,7 @@ class DataReader:
self.db.empty_string = parse_config_string(self.empty_string)
self._getopt('field_sep', config, name, template, FIELD_SEP)
if not DRY_RUN:
if self.db.copy_sep is None:
self.db.copy_sep = self.field_sep
@ -73,6 +74,7 @@ class DataReader:
self.log.debug("reader.readconfig null: '%s'" % self.db.null)
self.log.debug("reader.readconfig empty_string: '%s'",
self.db.empty_string)
self.log.debug("reader.db %s copy_sep %s" % (self.db, self.db.copy_sep))
self.log.debug("reader.readconfig field_sep: '%s'", self.field_sep)
@ -115,6 +117,7 @@ class DataReader:
except IOError, error:
raise PGLoader_Error, error
self.log.info("Opened '%s' in %s" % (self.filename, self.fd))
return self.fd
def readlines(self):