From f20ac7116eb008fd5589753079ac21ca15a6713f Mon Sep 17 00:00:00 2001 From: dim Date: Wed, 13 Feb 2008 11:30:18 +0000 Subject: [PATCH] Share Reject instance between every threads processing a given section, and protect the Reject.log() with a Lock --- pgloader/pgloader.py | 76 ++++++++++++++++++++++++++++---------------- pgloader/tools.py | 40 +++++++++++++++++------ 2 files changed, 78 insertions(+), 38 deletions(-) diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 29e9dec..6300f9a 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -32,20 +32,23 @@ class PGLoader(threading.Thread): """ def __init__(self, name, config, sem, stats, - logname = None, queue = None, lock = None): + logname = None, + reject = None, queue = None, lock = None): """ Init with a configuration section """ threading.Thread.__init__(self, name = name) # sem, stats and queue (if not None) are global objects: - # sem is shared by all threads at the same level - # stats is a private entry of a shared dict - # queue is given when reading in round robin mode - # lock is a threading.Lock for reading sync + # sem is shared by all threads at the same level + # stats is a private entry of a shared dict + # queue is given when reading in round robin mode + # lock is a threading.Lock for reading sync + # reject is a common reject object, protected by a parent-thread rlock # self.sem = sem self.stats = stats self.queue = queue self.lock = lock + self.reject = reject # thereafter parameters are local self.name = name @@ -186,28 +189,33 @@ class PGLoader(threading.Thread): ## # reject log and data files defaults to /tmp/
.rej[.log] - if config.has_option(name, 'reject_log'): - self.reject_log = config.get(name, 'reject_log') - - if config.has_option(name, 'reject_data'): - self.reject_data = config.get(name, 'reject_data') - - if not self.template and 'reject_log' not in self.__dict__: - self.reject_log = os.path.join('/tmp', '%s.rej.log' % name) + if self.reject is None: + # If we've been given a reject object, just use it, don't + # even try to init a new one from configuration - if not self.template and 'reject_data' not in self.__dict__: - self.reject_data = os.path.join('/tmp', '%s.rej' % name) + if config.has_option(name, 'reject_log'): + self.reject_log = config.get(name, 'reject_log') - # reject logging - if not self.template: - self.reject = Reject(self.log, self.reject_log, self.reject_data) + if config.has_option(name, 'reject_data'): + self.reject_data = config.get(name, 'reject_data') - self.log.info('reject log in %s', self.reject.reject_log) - self.log.info('rejected data in %s', self.reject.reject_data) + if not self.template and 'reject_log' not in self.__dict__: + self.reject_log = os.path.join('/tmp', '%s.rej.log' % name) - else: - # needed to instanciate self.reader while in template section - self.reject = None + if not self.template and 'reject_data' not in self.__dict__: + self.reject_data = os.path.join('/tmp', '%s.rej' % name) + + # reject logging + if not self.template: + self.reject = Reject(self.log, + self.reject_log, self.reject_data) + + self.log.info('reject log in %s', self.reject.reject_log) + self.log.info('rejected data in %s', self.reject.reject_data) + + else: + # needed to instanciate self.reader while in template section + self.reject = None # optionnal local option client_encoding if config.has_option(name, 'client_encoding'): @@ -739,6 +747,13 @@ class PGLoader(threading.Thread): return + # Mutli-Threaded processing of current section + # We want a common Lock() protected Reject object + # + # note: this will be done only once, children threads are + # started with self.section_threads == 1 + self.reject.set_lock(threading.Lock()) + if self.split_file_reading: # start self.section_threads workers self.split_file_read() @@ -801,7 +816,9 @@ class PGLoader(threading.Thread): current_name = "%s[%d]" % (self.name, current) loader = PGLoader(self.name, self.config, sem, - summary[current], current_name) + summary[current], + logname = current_name, + reject = self.reject) loader.section_threads = 1 loader.reader.set_boundaries(boundaries[current]) @@ -872,6 +889,7 @@ class PGLoader(threading.Thread): loader = PGLoader(self.name, self.config, sem, summary[current], logname = current_name, + reject = self.reject, queue = queues[current], lock = locks [current]) @@ -923,23 +941,25 @@ class PGLoader(threading.Thread): c = (n / self.rrqueue_size) % self.section_threads # we could have some locks to release here - self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%N=%s (n+1/rrqueue_size)%%N=%s" \ + self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%%d=%d (n+1/rrqueue_size)%%%d=%d" \ % (p, c, n, + self.section_threads, (n/self.rrqueue_size) % self.section_threads, + self.section_threads, ((n+1)/self.rrqueue_size) % self.section_threads)) - if p != c: + if p != c or (n % self.rrqueue_size != 0): self.log.debug("locks[%d].release" % p) locks[p].release() k = threads.keys() for c in range(self.section_threads): - self.log.debug("locks[%d].acquire to set %s.done = True" % (c, k[c])) + self.log.info("locks[%d].acquire to set %s.done = True" % (c, k[c])) locks[c].acquire() threads[k[c]].done = True - self.log.debug("locks[%d].release (done set)" % c) + self.log.info("locks[%d].release (done set)" % c) locks[c].release() from tools import running_threads diff --git a/pgloader/tools.py b/pgloader/tools.py index dde2736..d1b6e2c 100644 --- a/pgloader/tools.py +++ b/pgloader/tools.py @@ -23,10 +23,16 @@ class Reject: self._log = log self.reject_log = reject_log self.reject_data = reject_data + self.lock = None # we will open files on first error self.errors = 0 + def set_lock(self, lock): + """ when used in a multi-threaded way, you want a common + reject facility, protected from concurrent writes """ + self.lock = lock + def print_stats(self, name): """ give a summary """ if DRY_RUN: @@ -42,6 +48,26 @@ class Reject: self.reject_data) def log(self, messages, data = None): + """ Acquire the lock if needed, do_log() and check for IOError """ + + if self.lock: + self._log.debug("Reject acquire") + self.lock.acquire() + + try: + self.do_log(messages, data) + + except IOError, e: + raise PGLoader_Error, e + + except PGLoader_Error, e: + raise + + if self.lock: + self._log.debug("Reject release") + self.lock.release() + + def do_log(self, messages, data = None): """ log the messages into reject_log, and the data into reject_data We open the file on each request, cause we supose errors to be @@ -49,11 +75,8 @@ class Reject: """ if self.errors == 0: - try: - fd_log = open(self.reject_log, 'wb+') - fd_data = open(self.reject_data, 'wb+') - except IOError, error: - raise PGLoader_Error, error + fd_log = open(self.reject_log, 'wb+') + fd_data = open(self.reject_data, 'wb+') else: fd_log = open(self.reject_log, 'ab+') fd_data = open(self.reject_data, 'ab+') @@ -87,11 +110,8 @@ class Reject: # now we close the two fds for f in [fd_log, fd_data]: - try: - f.flush() - f.close() - except IOError, e: - raise PGLoader_Error, e + f.flush() + f.close() self.errors += 1