From 8200ceda366185d15be79e51896b9d55aa1c12f5 Mon Sep 17 00:00:00 2001 From: dim Date: Wed, 13 Feb 2008 09:18:09 +0000 Subject: [PATCH] Round Robin Reader seems ok now --- pgloader/pgloader.py | 67 +++++++++++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 8a2434f..04ac0da 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -716,8 +716,7 @@ class PGLoader(threading.Thread): self.sem.acquire() # Announce the beginning of the work - self.log.info("%s launched --- %d threads" % (self.name, - self.section_threads)) + self.log.info("%s launched" % self.name) if self.section_threads == 1: if self.reader.start is not None: @@ -815,12 +814,13 @@ class PGLoader(threading.Thread): n = running_threads(threads) log.info("Waiting for %d threads to terminate" % n) - # Try to acquire all semaphore entries + # Try to acquire all semaphore entries --- success means no + # more thread is running for i in range(self.section_threads): sem.acquire() - log.info("Acquired %d times, " % (i+1) + \ - "still waiting for %d threads to terminate" \ - % running_threads(threads)) + self.log.info("Acquired %d times, " % (i+1) + \ + "still waiting for %d threads to terminate" \ + % running_threads(threads)) self.finish_processing() self.duration = time.time() - self.init_time @@ -859,7 +859,7 @@ class PGLoader(threading.Thread): # acquire the lock before starting worker thread # and release it once its queue if full - self.log.info("locks[%d].acquire" % current) + self.log.debug("locks[%d].acquire" % current) locks[current].acquire() try: @@ -897,29 +897,59 @@ class PGLoader(threading.Thread): for line, columns in self.reader.readlines(): if p != c: - self.log.info("read %d lines, queue to thread %s" % (n, c)) + self.log.debug("read %d lines, queue to thread %s" % (n, c)) # release p'thread (which will empty its queue) and # lock c'thread --- waiting until it has emptied its # queue - self.log.info("locks[%d].release" % p) - self.log.info("locks[%d].acquire" % c) + self.log.debug("locks[%d].release" % p) locks[p].release() - locks[c].acquire() + + # only acquire next thread lock if it's not its first usage + # cause we acquire() the lock at init time + # + # c is 0..self.section_threads, each thread process a + # queue of rrqueue_size elements + if n > rrqueue_size * (c+1): + self.log.debug("locks[%d].acquire" % c) + locks[c].acquire() queues[c].append((line, columns)) n += 1 p = c c = (n / rrqueue_size) % self.section_threads - # don't forget to process last run - locks[c].release() + # we could have some locks to release here + self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%N=%s (n+1/rrqueue_size)%%N=%s" \ + % (p, c, n, + (n/rrqueue_size) % self.section_threads, + ((n+1)/rrqueue_size) % self.section_threads)) + + if p != c: + self.log.debug("locks[%d].release" % p) + locks[p].release() k = threads.keys() for c in range(self.section_threads): - threads[k[c]].acquire() + self.log.debug("locks[%d].acquire to set %s.done = True" % (c, k[c])) + locks[c].acquire() + threads[k[c]].done = True - threads[k[c]].release() + + self.log.debug("locks[%d].release (done set)" % c) + locks[c].release() + + from tools import running_threads + n = running_threads(threads) + self.log.info("Waiting for %d threads to terminate" % n) + + # Try to acquire all semaphore entries --- success means no + # more thread is running + for i in range(self.section_threads): + sem.acquire() + self.log.debug("Acquired %d times, " % (i+1) + \ + "still waiting for %d threads to terminate" \ + % running_threads(threads)) self.finish_processing() self.duration = time.time() - self.init_time @@ -946,8 +976,11 @@ class PGLoader(threading.Thread): while not self.done: self.lock.acquire() - for line, columns in self.queue.readlines(): - yield line, columns + + if len(self.queue) > 0: + self.log.debug("processing queue") + for line, columns in self.queue.readlines(): + yield line, columns self.lock.release()