diff --git a/examples/parallel.conf b/examples/parallel.conf index faa6ebc..6f16c08 100644 --- a/examples/parallel.conf +++ b/examples/parallel.conf @@ -39,3 +39,4 @@ split_file_reading = True use_template = parallel_template section_threads = 2 split_file_reading = False +rrqueue_size = -1 diff --git a/pgloader.py b/pgloader.py index da311b7..939a7df 100644 --- a/pgloader.py +++ b/pgloader.py @@ -517,6 +517,7 @@ def load_data(): n = running_threads(threads) log.info("Waiting for %d threads to terminate" % n) + time.sleep(2) # Try to acquire all semaphore entries for i in range(max_running): diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 04ac0da..29e9dec 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -473,6 +473,13 @@ class PGLoader(threading.Thread): self.split_file_reading = config.get(name, 'split_file_reading') == 'True' else: self.split_file_reading = SPLIT_FILE_READING + + self.rrqueue_size = RRQUEUE_SIZE + if config.has_option(name, 'rrqueue_size'): + self.rrqueue_size = config.getint(name, 'rrqueue_size') + + if self.rrqueue_size is None or self.rrqueue_size < 1: + self.rrqueue_size = self.db.copy_every if not self.template: for opt in ('section_threads', 'split_file_reading'): @@ -849,10 +856,6 @@ class PGLoader(threading.Thread): summary = {} threads = {} - rrqueue_size = RRQUEUE_SIZE - if rrqueue_size is None: - rrqueue_size = self.db.copy_every - for current in range(self.section_threads): queues[current] = RRReader() locks [current] = threading.Lock() @@ -909,21 +912,21 @@ class PGLoader(threading.Thread): # cause we acquire() the lock at init time # # c is 0..self.section_threads, each thread process a - # queue of rrqueue_size elements - if n > rrqueue_size * (c+1): + # queue of self.rrqueue_size elements + if n > self.rrqueue_size * (c+1): self.log.debug("locks[%d].acquire" % c) locks[c].acquire() queues[c].append((line, columns)) n += 1 p = c - c = (n / rrqueue_size) % self.section_threads + c = (n / self.rrqueue_size) % self.section_threads # we could have some locks to release here self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%N=%s (n+1/rrqueue_size)%%N=%s" \ % (p, c, n, - (n/rrqueue_size) % self.section_threads, - ((n+1)/rrqueue_size) % self.section_threads)) + (n/self.rrqueue_size) % self.section_threads, + ((n+1)/self.rrqueue_size) % self.section_threads)) if p != c: self.log.debug("locks[%d].release" % p)