Support for 'rrqueue_size' per-section configuration parameter, defaults to copy_every

This commit is contained in:
dim 2008-02-13 09:42:26 +00:00
parent 8200ceda36
commit e641b18fb3
3 changed files with 14 additions and 9 deletions

View File

@ -39,3 +39,4 @@ split_file_reading = True
use_template = parallel_template
section_threads = 2
split_file_reading = False
rrqueue_size = -1

View File

@ -517,6 +517,7 @@ def load_data():
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
time.sleep(2)
# Try to acquire all semaphore entries
for i in range(max_running):

View File

@ -473,6 +473,13 @@ class PGLoader(threading.Thread):
self.split_file_reading = config.get(name, 'split_file_reading') == 'True'
else:
self.split_file_reading = SPLIT_FILE_READING
self.rrqueue_size = RRQUEUE_SIZE
if config.has_option(name, 'rrqueue_size'):
self.rrqueue_size = config.getint(name, 'rrqueue_size')
if self.rrqueue_size is None or self.rrqueue_size < 1:
self.rrqueue_size = self.db.copy_every
if not self.template:
for opt in ('section_threads', 'split_file_reading'):
@ -849,10 +856,6 @@ class PGLoader(threading.Thread):
summary = {}
threads = {}
rrqueue_size = RRQUEUE_SIZE
if rrqueue_size is None:
rrqueue_size = self.db.copy_every
for current in range(self.section_threads):
queues[current] = RRReader()
locks [current] = threading.Lock()
@ -909,21 +912,21 @@ class PGLoader(threading.Thread):
# cause we acquire() the lock at init time
#
# c is 0..self.section_threads, each thread process a
# queue of rrqueue_size elements
if n > rrqueue_size * (c+1):
# queue of self.rrqueue_size elements
if n > self.rrqueue_size * (c+1):
self.log.debug("locks[%d].acquire" % c)
locks[c].acquire()
queues[c].append((line, columns))
n += 1
p = c
c = (n / rrqueue_size) % self.section_threads
c = (n / self.rrqueue_size) % self.section_threads
# we could have some locks to release here
self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%N=%s (n+1/rrqueue_size)%%N=%s" \
% (p, c, n,
(n/rrqueue_size) % self.section_threads,
((n+1)/rrqueue_size) % self.section_threads))
(n/self.rrqueue_size) % self.section_threads,
((n+1)/self.rrqueue_size) % self.section_threads))
if p != c:
self.log.debug("locks[%d].release" % p)