Round Robin Reader seems ok now

This commit is contained in:
dim 2008-02-13 09:18:09 +00:00
parent bac5d2b482
commit 8200ceda36

View File

@ -716,8 +716,7 @@ class PGLoader(threading.Thread):
self.sem.acquire()
# Announce the beginning of the work
self.log.info("%s launched --- %d threads" % (self.name,
self.section_threads))
self.log.info("%s launched" % self.name)
if self.section_threads == 1:
if self.reader.start is not None:
@ -815,12 +814,13 @@ class PGLoader(threading.Thread):
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
# Try to acquire all semaphore entries
# Try to acquire all semaphore entries --- success means no
# more thread is running
for i in range(self.section_threads):
sem.acquire()
log.info("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.log.info("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.finish_processing()
self.duration = time.time() - self.init_time
@ -859,7 +859,7 @@ class PGLoader(threading.Thread):
# acquire the lock before starting worker thread
# and release it once its queue if full
self.log.info("locks[%d].acquire" % current)
self.log.debug("locks[%d].acquire" % current)
locks[current].acquire()
try:
@ -897,29 +897,59 @@ class PGLoader(threading.Thread):
for line, columns in self.reader.readlines():
if p != c:
self.log.info("read %d lines, queue to thread %s" % (n, c))
self.log.debug("read %d lines, queue to thread %s" % (n, c))
# release p'thread (which will empty its queue) and
# lock c'thread --- waiting until it has emptied its
# queue
self.log.info("locks[%d].release" % p)
self.log.info("locks[%d].acquire" % c)
self.log.debug("locks[%d].release" % p)
locks[p].release()
locks[c].acquire()
# only acquire next thread lock if it's not its first usage
# cause we acquire() the lock at init time
#
# c is 0..self.section_threads, each thread process a
# queue of rrqueue_size elements
if n > rrqueue_size * (c+1):
self.log.debug("locks[%d].acquire" % c)
locks[c].acquire()
queues[c].append((line, columns))
n += 1
p = c
c = (n / rrqueue_size) % self.section_threads
# don't forget to process last run
locks[c].release()
# we could have some locks to release here
self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%N=%s (n+1/rrqueue_size)%%N=%s" \
% (p, c, n,
(n/rrqueue_size) % self.section_threads,
((n+1)/rrqueue_size) % self.section_threads))
if p != c:
self.log.debug("locks[%d].release" % p)
locks[p].release()
k = threads.keys()
for c in range(self.section_threads):
threads[k[c]].acquire()
self.log.debug("locks[%d].acquire to set %s.done = True" % (c, k[c]))
locks[c].acquire()
threads[k[c]].done = True
threads[k[c]].release()
self.log.debug("locks[%d].release (done set)" % c)
locks[c].release()
from tools import running_threads
n = running_threads(threads)
self.log.info("Waiting for %d threads to terminate" % n)
# Try to acquire all semaphore entries --- success means no
# more thread is running
for i in range(self.section_threads):
sem.acquire()
self.log.debug("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.finish_processing()
self.duration = time.time() - self.init_time
@ -946,8 +976,11 @@ class PGLoader(threading.Thread):
while not self.done:
self.lock.acquire()
for line, columns in self.queue.readlines():
yield line, columns
if len(self.queue) > 0:
self.log.debug("processing queue")
for line, columns in self.queue.readlines():
yield line, columns
self.lock.release()