From a27a801f62342d74067370b690488cdcea3d01ae Mon Sep 17 00:00:00 2001 From: dim Date: Fri, 15 Feb 2008 21:14:45 +0000 Subject: [PATCH] Synchronize threads starts and ends with an Event --- pgloader.py | 37 ++++++------ pgloader/pgloader.py | 134 +++++++++++++++++++++++-------------------- pgloader/tools.py | 17 +++--- 3 files changed, 99 insertions(+), 89 deletions(-) diff --git a/pgloader.py b/pgloader.py index 59866e8..002aaf0 100644 --- a/pgloader.py +++ b/pgloader.py @@ -462,8 +462,10 @@ def load_data(): # we run through sorted section list sections.sort() - threads = {} - current = 0 + threads = {} + started = {} + finished = {} + current = 0 interrupted = False max_running = MAX_PARALLEL_SECTIONS @@ -476,8 +478,12 @@ def load_data(): s = sections[current] try: - summary[s] = [] - loader = PGLoader(s, config, sem, summary[s]) + summary [s] = [] + started [s] = threading.Event() + finished[s] = threading.Event() + + loader = PGLoader(s, config, sem, + (started[s], finished[s]), summary[s]) if not loader.template: filename = loader.filename input_encoding = loader.input_encoding @@ -485,11 +491,13 @@ def load_data(): # .start() will sem.aquire(), so we won't have more # than max_running threads running at any time. - log.info("Starting thread for %s" % s) + log.debug("Starting thread for %s" % s) threads[s].start() else: log.info("Skipping section %s, which is a template" % s) - summary.pop(s) + + for d in (summary, started, finished): + d.pop(s) except PGLoader_Error, e: if e == '': @@ -512,19 +520,12 @@ def load_data(): current += 1 - if not interrupted: - from pgloader.tools import running_threads - - n = running_threads(threads) - log.info("Waiting for %d threads to terminate" % n) - time.sleep(2) + # get sure each thread is started, then each one is done + from pgloader.tools import check_events - # Try to acquire all semaphore entries - for i in range(max_running): - sem.acquire() - log.debug("Acquired %d times, " % (i+1) + \ - "still waiting for %d threads to terminate" \ - % running_threads(threads)) + check_events(started, log, "is running") + log.info("All threads are started, wait for them to terminate") + check_events(finished, log, "processing is over") # total duration td = time.time() - begin diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 0d4133b..4bca97b 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -9,7 +9,7 @@ import os, sys, os.path, time, codecs, threading from cStringIO import StringIO from logger import log, getLogger -from tools import PGLoader_Error, Reject, parse_config_string +from tools import PGLoader_Error, Reject, parse_config_string, check_events from db import db from lo import ifx_clob, ifx_blob @@ -31,7 +31,8 @@ class PGLoader(threading.Thread): import data with COPY or update blob data with UPDATE. """ - def __init__(self, name, config, sem, stats, + def __init__(self, name, config, sem, (started, finished), + stats, logname = None, reject = None, queue = None, lock = None, copy_sep = None): """ Init with a configuration section """ @@ -47,13 +48,17 @@ class PGLoader(threading.Thread): # sem, stats and queue (if not None) are global objects # sem is shared by all threads at the same level + # started is a threads.Event object to set() once in run() + # finished is a threads.Event object to set() once processing is over # stats is a private entry of a shared dict # queue is given when reading in round robin mode # lock is a threading.Lock for reading sync - # reject is a common reject object, protected by a parent-thread rlock + # reject is the common reject object # self.sem = sem + self.started = started self.stats = stats + self.finished = finished self.queue = queue self.lock = lock self.reject = reject @@ -86,10 +91,10 @@ class PGLoader(threading.Thread): if config.has_option(name, 'template'): self.template = True - self.log.info("[%s] is a template", self.name) + self.log.debug("[%s] is a template", self.name) if not self.template: - self.log.info("[%s] parse configuration", self.name) + self.log.debug("[%s] parse configuration", self.name) if not self.template: # check if the section wants to use a template @@ -108,8 +113,8 @@ class PGLoader(threading.Thread): raise PGLoader_Error, m # first load template configuration - self.log.info("Reading configuration from template " +\ - "section [%s]", self.template) + self.log.debug("Reading configuration from template " +\ + "section [%s]", self.template) self.real_log = self.log self.log = getLogger(self.template) @@ -129,7 +134,7 @@ class PGLoader(threading.Thread): if not self.template: # now load specific configuration - self.log.info("Reading configuration from section [%s]", name) + self.log.debug("Reading configuration from section [%s]", name) self._read_conf(name, config, db) # Now reset database connection @@ -237,8 +242,8 @@ class PGLoader(threading.Thread): self.reject = Reject(self.log, self.reject_log, self.reject_data) - self.log.info('reject log in %s', self.reject.reject_log) - self.log.info('rejected data in %s', self.reject.reject_data) + self.log.debug('reject log in %s', self.reject.reject_log) + self.log.debug('rejected data in %s', self.reject.reject_data) else: # needed to instanciate self.reject while in template section @@ -551,8 +556,8 @@ class PGLoader(threading.Thread): if 'format' not in self.__dict__: raise PGLoader_Error, "Please configure %s.format" % name - self.log.info("File '%s' will be read in %s format" \ - % (self.filename, self.format)) + self.log.debug("File '%s' will be read in %s format" \ + % (self.filename, self.format)) if self.format.lower() == 'csv': from csvreader import CSVReader @@ -772,9 +777,12 @@ class PGLoader(threading.Thread): # care about number of threads launched self.sem.acquire() + + # tell parent thread we are running now + self.started.set() # Announce the beginning of the work - self.log.info("%s launched" % self.logname) + self.log.debug("%s processing" % self.logname) if self.section_threads == 1: if 'reader' in self.__dict__ and self.reader.start is not None: @@ -789,9 +797,7 @@ class PGLoader(threading.Thread): except Exception, e: self.log.error(e) - self.log.info("Releasing %s" % self.logname) - self.sem.release() - + self.terminate() return # Mutli-Threaded processing of current section @@ -809,26 +815,19 @@ class PGLoader(threading.Thread): # here we need a special thread reading the file self.round_robin_read() - self.log.info("releasing %s" % self.logname) - self.sem.release() + self.terminate() return - def wait_for_workers(self, sem, workers): - """ - Try to acquire all semaphore entries --- success means no - more thread is running - """ - from tools import running_threads + def terminate(self): + """ Announce it's over and free the concurrency control semaphore """ - n = running_threads(workers) - self.log.info("Waiting for %d/%d threads to terminate" \ - % (n, len(workers))) + self.log.debug("releasing %s semaphore" % self.logname) + self.sem.release() - for i in range(n): - sem.acquire() - self.log.debug("Acquired %d times, " % (i+1) + \ - "still waiting for %d threads to terminate" \ - % running_threads(workers)) + # tell parent thread processing is now over, here + self.log.debug("Announce it's over") + self.finished.set() + return def split_file_read(self): @@ -871,16 +870,21 @@ class PGLoader(threading.Thread): self.prepare_processing() # now create self.section_threads PGLoader threads - sem = threading.BoundedSemaphore(self.section_threads) - summary = {} - threads = {} + sem = threading.BoundedSemaphore(self.section_threads) + summary = {} + threads = {} + started = {} + finished = {} for current in range(self.section_threads): try: - summary[current] = [] - current_name = "%s[%d]" % (self.name, current) + summary[current] = [] + started[current] = threading.Event() + finished[current] = threading.Event() + current_name = "%s.%d" % (self.name, current) loader = PGLoader(self.name, self.config, sem, + (started[current], finished[current]), summary[current], logname = current_name, reject = self.reject) @@ -895,14 +899,13 @@ class PGLoader(threading.Thread): except Exception, e: raise - # wait for loaders completion, first let them some time to - # be started - time.sleep(2) - - self.wait_for_workers(sem, threads) + # wait for workers to have started, then wait for them to terminate + check_events(started, self.log, "is running") + check_events(finished, self.log, "processing is over") + self.finish_processing() self.duration = time.time() - self.init_time - self.log.info('No more threads are running, %s done' % self.name) + self.log.debug('No more threads are running, %s done' % self.name) stats = [0, 0] for s in summary: @@ -921,11 +924,13 @@ class PGLoader(threading.Thread): self.prepare_processing() from tools import RRReader - queues = {} - locks = {} - sem = threading.BoundedSemaphore(self.section_threads) - summary = {} - threads = {} + queues = {} + locks = {} + sem = threading.BoundedSemaphore(self.section_threads) + summary = {} + threads = {} + started = {} + finished = {} for current in range(self.section_threads): queues[current] = RRReader() @@ -937,10 +942,13 @@ class PGLoader(threading.Thread): locks[current].acquire() try: - summary[current] = [] - current_name = "%s[%d]" % (self.name, current) + summary [current] = [] + started [current] = threading.Event() + finished[current] = threading.Event() + current_name = "%s.%d" % (self.name, current) loader = PGLoader(self.name, self.config, sem, + (started[current], finished[current]), summary[current], logname = current_name, reject = self.reject, @@ -961,13 +969,13 @@ class PGLoader(threading.Thread): if len(threads) != self.section_threads: self.log.error("Couldn't start all threads, check previous errors") - self.wait_for_workers(sem, threads) + + check_events([x for x in finished if threads[x].isAlive()], + self.log, "processing is over") return - # wait for loaders completion, first let them some time to - # be started - time.sleep(2) - + check_events(started, self.log, "is running") + # Now self.section_threads are started and we have a queue and # a Condition for each of them. # @@ -1002,17 +1010,19 @@ class PGLoader(threading.Thread): c = (n / self.rrqueue_size) % self.section_threads # we could have some locks to release here - self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%%d=%d (n+1/rrqueue_size)%%%d=%d" \ + self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%%d=%d " \ % (p, c, n, self.section_threads, - (n/self.rrqueue_size) % self.section_threads, - self.section_threads, + (n/self.rrqueue_size) % self.section_threads) + \ + "(n+1/rrqueue_size)%%%d=%d" \ + % (self.section_threads, ((n+1)/self.rrqueue_size) % self.section_threads)) if p != c or (n % self.rrqueue_size != 0): self.log.debug("locks[%d].release" % p) locks[p].release() + # mark all worker threads has done k = threads.keys() for c in range(self.section_threads): self.log.debug("locks[%d].acquire to set %s.done = True" \ @@ -1024,10 +1034,12 @@ class PGLoader(threading.Thread): self.log.debug("locks[%d].release (done set)" % c) locks[c].release() - self.wait_for_workers(sem, threads) + # wait for workers to finish processing + check_events(finished, self.log, "processing is over") + self.finish_processing() self.duration = time.time() - self.init_time - self.log.info('%s done' % self.name) + self.log.debug('%s done' % self.name) stats = [0, 0] for s in summary: @@ -1080,7 +1092,7 @@ class PGLoader(threading.Thread): if TRIGGERS and not DRY_RUN: self.db.enable_triggers(self.table) - self.log.info("loading done") + self.log.debug("loading done") return def update_summary(self): diff --git a/pgloader/tools.py b/pgloader/tools.py index d1b6e2c..87d9c5e 100644 --- a/pgloader/tools.py +++ b/pgloader/tools.py @@ -186,17 +186,14 @@ def check_dirname(path): return True, None + +def check_events(events, log, context = "is running"): + """ wait until all events (list) are set """ + for t in events: + events[t].wait() + log.debug("thread %s %s" % (t, context)) - -def running_threads(threads): - """ count running threads """ - running = 0 - for s in threads: - if threads[s].isAlive(): - running += 1 - - return running - + return class RRReader(collections.deque): """ Round Robin reader, which are collections.deque with a