Share Reject instance between every threads processing a given section, and protect the Reject.log() with a Lock

This commit is contained in:
dim 2008-02-13 11:30:18 +00:00
parent e641b18fb3
commit f20ac7116e
2 changed files with 78 additions and 38 deletions

View File

@ -32,20 +32,23 @@ class PGLoader(threading.Thread):
"""
def __init__(self, name, config, sem, stats,
logname = None, queue = None, lock = None):
logname = None,
reject = None, queue = None, lock = None):
""" Init with a configuration section """
threading.Thread.__init__(self, name = name)
# sem, stats and queue (if not None) are global objects:
# sem is shared by all threads at the same level
# stats is a private entry of a shared dict
# queue is given when reading in round robin mode
# lock is a threading.Lock for reading sync
# sem is shared by all threads at the same level
# stats is a private entry of a shared dict
# queue is given when reading in round robin mode
# lock is a threading.Lock for reading sync
# reject is a common reject object, protected by a parent-thread rlock
#
self.sem = sem
self.stats = stats
self.queue = queue
self.lock = lock
self.reject = reject
# thereafter parameters are local
self.name = name
@ -186,28 +189,33 @@ class PGLoader(threading.Thread):
##
# reject log and data files defaults to /tmp/<section>.rej[.log]
if config.has_option(name, 'reject_log'):
self.reject_log = config.get(name, 'reject_log')
if config.has_option(name, 'reject_data'):
self.reject_data = config.get(name, 'reject_data')
if not self.template and 'reject_log' not in self.__dict__:
self.reject_log = os.path.join('/tmp', '%s.rej.log' % name)
if self.reject is None:
# If we've been given a reject object, just use it, don't
# even try to init a new one from configuration
if not self.template and 'reject_data' not in self.__dict__:
self.reject_data = os.path.join('/tmp', '%s.rej' % name)
if config.has_option(name, 'reject_log'):
self.reject_log = config.get(name, 'reject_log')
# reject logging
if not self.template:
self.reject = Reject(self.log, self.reject_log, self.reject_data)
if config.has_option(name, 'reject_data'):
self.reject_data = config.get(name, 'reject_data')
self.log.info('reject log in %s', self.reject.reject_log)
self.log.info('rejected data in %s', self.reject.reject_data)
if not self.template and 'reject_log' not in self.__dict__:
self.reject_log = os.path.join('/tmp', '%s.rej.log' % name)
else:
# needed to instanciate self.reader while in template section
self.reject = None
if not self.template and 'reject_data' not in self.__dict__:
self.reject_data = os.path.join('/tmp', '%s.rej' % name)
# reject logging
if not self.template:
self.reject = Reject(self.log,
self.reject_log, self.reject_data)
self.log.info('reject log in %s', self.reject.reject_log)
self.log.info('rejected data in %s', self.reject.reject_data)
else:
# needed to instanciate self.reader while in template section
self.reject = None
# optionnal local option client_encoding
if config.has_option(name, 'client_encoding'):
@ -739,6 +747,13 @@ class PGLoader(threading.Thread):
return
# Mutli-Threaded processing of current section
# We want a common Lock() protected Reject object
#
# note: this will be done only once, children threads are
# started with self.section_threads == 1
self.reject.set_lock(threading.Lock())
if self.split_file_reading:
# start self.section_threads workers
self.split_file_read()
@ -801,7 +816,9 @@ class PGLoader(threading.Thread):
current_name = "%s[%d]" % (self.name, current)
loader = PGLoader(self.name, self.config, sem,
summary[current], current_name)
summary[current],
logname = current_name,
reject = self.reject)
loader.section_threads = 1
loader.reader.set_boundaries(boundaries[current])
@ -872,6 +889,7 @@ class PGLoader(threading.Thread):
loader = PGLoader(self.name, self.config, sem,
summary[current],
logname = current_name,
reject = self.reject,
queue = queues[current],
lock = locks [current])
@ -923,23 +941,25 @@ class PGLoader(threading.Thread):
c = (n / self.rrqueue_size) % self.section_threads
# we could have some locks to release here
self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%N=%s (n+1/rrqueue_size)%%N=%s" \
self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%%d=%d (n+1/rrqueue_size)%%%d=%d" \
% (p, c, n,
self.section_threads,
(n/self.rrqueue_size) % self.section_threads,
self.section_threads,
((n+1)/self.rrqueue_size) % self.section_threads))
if p != c:
if p != c or (n % self.rrqueue_size != 0):
self.log.debug("locks[%d].release" % p)
locks[p].release()
k = threads.keys()
for c in range(self.section_threads):
self.log.debug("locks[%d].acquire to set %s.done = True" % (c, k[c]))
self.log.info("locks[%d].acquire to set %s.done = True" % (c, k[c]))
locks[c].acquire()
threads[k[c]].done = True
self.log.debug("locks[%d].release (done set)" % c)
self.log.info("locks[%d].release (done set)" % c)
locks[c].release()
from tools import running_threads

View File

@ -23,10 +23,16 @@ class Reject:
self._log = log
self.reject_log = reject_log
self.reject_data = reject_data
self.lock = None
# we will open files on first error
self.errors = 0
def set_lock(self, lock):
""" when used in a multi-threaded way, you want a common
reject facility, protected from concurrent writes """
self.lock = lock
def print_stats(self, name):
""" give a summary """
if DRY_RUN:
@ -42,6 +48,26 @@ class Reject:
self.reject_data)
def log(self, messages, data = None):
""" Acquire the lock if needed, do_log() and check for IOError """
if self.lock:
self._log.debug("Reject acquire")
self.lock.acquire()
try:
self.do_log(messages, data)
except IOError, e:
raise PGLoader_Error, e
except PGLoader_Error, e:
raise
if self.lock:
self._log.debug("Reject release")
self.lock.release()
def do_log(self, messages, data = None):
""" log the messages into reject_log, and the data into reject_data
We open the file on each request, cause we supose errors to be
@ -49,11 +75,8 @@ class Reject:
"""
if self.errors == 0:
try:
fd_log = open(self.reject_log, 'wb+')
fd_data = open(self.reject_data, 'wb+')
except IOError, error:
raise PGLoader_Error, error
fd_log = open(self.reject_log, 'wb+')
fd_data = open(self.reject_data, 'wb+')
else:
fd_log = open(self.reject_log, 'ab+')
fd_data = open(self.reject_data, 'ab+')
@ -87,11 +110,8 @@ class Reject:
# now we close the two fds
for f in [fd_log, fd_data]:
try:
f.flush()
f.close()
except IOError, e:
raise PGLoader_Error, e
f.flush()
f.close()
self.errors += 1