From 84738febfd8d933a3ad5273cbdb27ffbf91fc5b4 Mon Sep 17 00:00:00 2001 From: dim Date: Sat, 16 Feb 2008 19:17:03 +0000 Subject: [PATCH] FIX round-robin reader case when not all configured threads are needed to load the file --- examples/parallel.conf | 4 ++-- pgloader/pgloader.py | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/examples/parallel.conf b/examples/parallel.conf index 8554c22..63ca5a4 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -12,7 +12,7 @@ lc_messages = C ;client_encoding = 'utf-8' client_encoding = 'latin1' -copy_every = 50 +copy_every = 5000 commit_every = 5 #copy_delimiter = % @@ -44,7 +44,7 @@ split_file_reading = True [rrr_csv] format = csv use_template = ptmpl -section_threads = 2 +section_threads = 3 split_file_reading = False rrqueue_size = -1 diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 4bca97b..6ed50c4 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -1009,6 +1009,10 @@ class PGLoader(threading.Thread): p = c c = (n / self.rrqueue_size) % self.section_threads + # save this for later reference + last_p = p + last_c = c + # we could have some locks to release here self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%%d=%d " \ % (p, c, n, @@ -1022,12 +1026,22 @@ class PGLoader(threading.Thread): self.log.debug("locks[%d].release" % p) locks[p].release() + # we could have read all the data and not needed all workers, + # log it when it's the case, then set .done = True without + # taking again a lock which we already have. + if n < (self.section_threads * self.rrqueue_size): + self.log.info("processed all data with only %d workers" % (c+1)) + # mark all worker threads has done k = threads.keys() for c in range(self.section_threads): - self.log.debug("locks[%d].acquire to set %s.done = True" \ + # we don't need any lock.acquire if we didn't use the worker + if n > (self.section_threads * self.rrqueue_size) \ + or c <= last_c: + self.log.debug("locks[%d].acquire to set %s.done = True" \ % (c, k[c])) - locks[c].acquire() + locks[c].acquire() + threads[k[c]].done = True