From b534962192a2788954ddee327a4a7b7314e02bec Mon Sep 17 00:00:00 2001 From: dim Date: Tue, 12 Feb 2008 11:27:18 +0000 Subject: [PATCH] Using semaphores to control how many max threads run at any given time --- examples/parallel.conf | 40 +++++++++++++ examples/pgloader.conf | 23 +------- pgloader.py | 92 +++++++++++++++--------------- pgloader/options.py | 3 +- pgloader/pgloader.py | 125 +++++++++++++++++++++++------------------ 5 files changed, 158 insertions(+), 125 deletions(-) create mode 100644 examples/parallel.conf diff --git a/examples/parallel.conf b/examples/parallel.conf new file mode 100644 index 0000000..b1a5072 --- /dev/null +++ b/examples/parallel.conf @@ -0,0 +1,40 @@ +[pgsql] +host = localhost +port = 5432 +base = pgloader +user = dim +pass = None + +log_file = /tmp/pgloader.log +log_min_messages = DEBUG +client_min_messages = WARNING +lc_messages = C + +;client_encoding = 'utf-8' +client_encoding = 'latin1' +copy_every = 5000 +commit_every = 5 +#copy_delimiter = % + +null = "" +empty_string = "\ " + +max_parallel_sections = 1 + +[parallel_template] +template = True +table = parallel +format = csv +filename = parallel/parallel.data +field_sep = ; +columns = a, b + +[parallel_greg] +use_template = parallel_template +section_threads = 4 +split_file_reading = True + +[parallel_simon] +use_template = parallel_template +section_threads = 2 +split_file_reading = False diff --git a/examples/pgloader.conf b/examples/pgloader.conf index 1beccfd..c2bfaee 100644 --- a/examples/pgloader.conf +++ b/examples/pgloader.conf @@ -19,8 +19,7 @@ commit_every = 5 null = "" empty_string = "\ " -max_parallel_sections = 1 - +max_parallel_sections = 4 [simple_tmpl] template = True @@ -99,26 +98,6 @@ field_sep = | columns = id, timestamp reformat = timestamp:mysql:timestamp -[parallel_template] -template = True -table = parallel -format = csv -filename = parallel/parallel.data -field_sep = ; -columns = a, b - -[parallel_greg] -use_template = parallel_template -max_threads = 4 -section_threads = -1 -split_file_reading = True - -[parallel_simon] -use_template = parallel_template -max_threads = 2 -section_threads = -1 -split_file_reading = False - [csv] table = csv format = csv diff --git a/pgloader.py b/pgloader.py index be7a48b..3892f3d 100644 --- a/pgloader.py +++ b/pgloader.py @@ -5,7 +5,7 @@ PostgreSQL data import tool, see included man page. """ -import os, sys, os.path, time, codecs, logging +import os, sys, os.path, time, codecs, logging, threading from cStringIO import StringIO import pgloader.options @@ -342,8 +342,13 @@ def print_summary(dbconn, sections, summary, td): print t print _ - t, d, u, e = summary[s] - d = duration_pprint(d) + if summary[s]: + t, d, u, e = summary[s] + d = duration_pprint(d) + else: + t = s + d = '%9s ' % '-' + u = e = 0 if False and not DRY_RUN: sql = "select pg_total_relation_size(%s), " + \ @@ -388,6 +393,16 @@ def print_summary(dbconn, sections, summary, td): return retcode +def running_threads(threads): + """ count running threads """ + running = 0 + for s in threads: + if threads[s].isAlive(): + running += 1 + + return running + + def load_data(): """ read option line and configuration file, then process data import of given section, or all sections if no section is given on @@ -458,46 +473,34 @@ def load_data(): sections.sort() threads = {} - running = 0 current = 0 + interrupted = False + + max_running = MAX_PARALLEL_SECTIONS + if max_running == -1: + max_running = len(sections) + + sem = threading.BoundedSemaphore(max_running) while current < len(sections): s = sections[current] - # update running - if running > 0: - for s in threads: - if not threads[s].isAlive(): - running -= 1 - - if MAX_PARALLEL_SECTIONS != -1: - # -1 means we can start as many parallel section - # processing as we want to - - if running == MAX_PARALLEL_SECTIONS: - # we have to wait for one thread to terminate - # before considering next one - log.info('%d/%d threads running, sleeping %gs' \ - % (running, MAX_PARALLEL_SECTIONS, .1)) - time.sleep(.1) - continue - try: summary[s] = [] - loader = PGLoader(s, config, summary[s]) + loader = PGLoader(s, config, sem, summary[s]) if not loader.template: filename = loader.filename input_encoding = loader.input_encoding - - threads[s] = loader - - log.info("Starting thread %d for %s" % (running, s)) + threads[s] = loader + + # .start() will sem.aquire(), so we won't have more + # than max_running threads running at any time. + log.info("Starting thread for %s" % s) threads[s].start() - running += 1 else: log.info("Skipping section %s, which is a template" % s) summary.pop(s) - + except PGLoader_Error, e: if e == '': log.error('[%s] Please correct previous errors' % s) @@ -514,39 +517,36 @@ def load_data(): % (filename, input_encoding)) except KeyboardInterrupt: + interrupted = True log.warning("Aborting on user demand (Interrupt)") current += 1 - while running > 0: - for s in threads: - if not threads[s].isAlive(): - running -= 1 + if not interrupted: + n = running_threads(threads) + log.info("Waiting for %d threads to terminate" % n) - if running > 0: - if MAX_PARALLEL_SECTIONS != 1: - log.info("%d thread(s) still running" % running) - - try: - if MAX_PARALLEL_SECTIONS != 1: - log.info('waiting for %d threads, sleeping %gs' % (running, 1)) - time.sleep(1) - except KeyboardInterrupt: - log.warning("Aborting %d threads still running at user demand"\ - % running) - break + # Try to acquire all semaphore entries + for i in range(max_running): + sem.acquire() + log.debug("Acquired %d times, " % i + \ + "still waiting for %d threads to terminate" \ + % running_threads(threads)) # total duration td = time.time() - begin retcode = 0 - if SUMMARY: + if SUMMARY and not interrupted: try: retcode = print_summary(None, sections, summary, td) print except PGLoader_Error, e: log.error("Can't print summary: %s" % e) + except KeyboardInterrupt: + pass + return retcode if __name__ == "__main__": diff --git a/pgloader/options.py b/pgloader/options.py index 514d93d..f8cbd16 100644 --- a/pgloader/options.py +++ b/pgloader/options.py @@ -36,9 +36,8 @@ UDC_PREFIX = 'udc_' REFORMAT_PATH = None DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat'] -MAX_THREADS = 1 MAX_PARALLEL_SECTIONS = 1 -SECTION_THREADS = -1 +SECTION_THREADS = 1 SPLIT_FILE_READING = False CLIENT_MIN_MESSAGES = None diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 4d0d30f..a5d7c10 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -21,7 +21,7 @@ from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING from options import NEWLINE_ESCAPES from options import UDC_PREFIX from options import REFORMAT_PATH -from options import MAX_THREADS, MAX_PARALLEL_SECTIONS +from options import MAX_PARALLEL_SECTIONS from options import SECTION_THREADS, SPLIT_FILE_READING class PGLoader(threading.Thread): @@ -30,12 +30,17 @@ class PGLoader(threading.Thread): import data with COPY or update blob data with UPDATE. """ - def __init__(self, name, config, stats, logname = None): + def __init__(self, name, config, sem, stats, logname = None): """ Init with a configuration section """ threading.Thread.__init__(self, name = name) - - # Some settings + + # sem and stats are global objects: + # sem is shared by all threads at the same level, stats is a + # private entry of a shared dict + self.sem = sem self.stats = stats + + # thereafter parameters are local self.name = name self.config = config @@ -163,6 +168,9 @@ class PGLoader(threading.Thread): def __read_conf__(self, name, config, db, want_template = False): """ init self from config section name """ + # we'll need both of them from the globals + global FROM_COUNT, FROM_ID + if want_template and not config.has_option(name, 'template'): e = 'Error: section %s is not a template' % name raise PGLoader_Error, e @@ -447,38 +455,29 @@ class PGLoader(threading.Thread): ## # Parallelism knobs - for opt, default in [('max_threads', MAX_THREADS), - ('section_threads', SECTION_THREADS), - ('split_file_reading', SPLIT_FILE_READING)]: - - if config.has_option(name, opt): - if opt in ['max_threads', 'section_threads']: - self.__dict__[opt] = config.getint(name, opt) - else: - self.__dict__[opt] = config.get(name, opt) == 'True' - else: - if not self.template: - self.__dict__[opt] = default + if config.has_option(name, 'section_threads'): + self.section_threads = config.getint(name, 'section_threads') + else: + self.section_threads = SECTION_THREADS - if not self.template: - self.log.info('%s.%s = %s' % (name, opt, str(self.__dict__[opt]))) + if config.has_option(name, 'split_file_reading'): + self.split_file_reading = config.get(name, 'split_file_reading') == 'True' + else: + self.split_file_reading = SPLIT_FILE_READING + + if not self.template: + for opt in ('section_threads', 'split_file_reading'): + self.log.debug('%s.%s = %s' % (name, opt, str(self.__dict__[opt]))) if not self.template and self.split_file_reading: - global FROM_COUNT if FROM_COUNT is not None and FROM_COUNT > 0: raise PGLoader_Error, \ "Conflict: can't use both 'split_file_reading' and '--from'" - global FROM_ID if FROM_ID is not None: raise PGLoader_Error, \ "Conflict: can't use both 'split_file_reading' and '--from-id'" - if not self.template and self.section_threads > self.max_threads: - raise PGLoader_Error, \ - "%s.section_threads > %s.max_threads : %d > %d" \ - % (name, name, self.section_threads, self.max_threads) - ## # Reader's init if config.has_option(name, 'format'): @@ -587,7 +586,6 @@ class PGLoader(threading.Thread): # if options.fromid is not None it has to be either a value, # when index is single key or a dict in a string, when index # is a multiple key - global FROM_ID if FROM_ID is not None: if len(self.index) > 1: # we have to evaluate given string and see if it is a @@ -691,12 +689,14 @@ class PGLoader(threading.Thread): def run(self): """ controling thread which dispatch the job """ + + # care about number of threads launched + self.sem.acquire() # Announce the beginning of the work - self.log.info("[%s]" % self.name) - - if self.max_threads == 1: + self.log.info("%s launched" % self.name) + if self.section_threads == 1: if self.reader.start is not None: self.log.info("Loading from offset %d to %d" \ % (self.reader.start, self.reader.end)) @@ -704,11 +704,11 @@ class PGLoader(threading.Thread): self.prepare_processing() self.process() self.finish_processing() - return - # now we're going to need mutli-threading - if self.section_threads == -1: - self.section_threads = self.max_threads + self.log.info("Releasing %s" % self.name) + self.sem.release() + + return if self.split_file_reading: # this option is not compatible with text mode when @@ -731,7 +731,7 @@ class PGLoader(threading.Thread): previous = end + 1 - self.log.info("Spliting input file of %d bytes %s" \ + self.log.debug("Spliting input file of %d bytes %s" \ % (filesize, str(boundaries))) # Now check for real boundaries @@ -755,43 +755,58 @@ class PGLoader(threading.Thread): self.prepare_processing() # now create self.section_threads PGLoader threads + # the semaphore here is not really usefull, but is part of the API + sem = threading.BoundedSemaphore(self.section_threads) summary = {} threads = {} running = 0 for current in range(self.section_threads): - summary[current] = [] - current_name = "%s[%d]" % (self.name, current) - loader = PGLoader(self.name, - self.config, - summary[current], - current_name) - loader.max_threads = 1 - loader.reader.set_boundaries(boundaries[current]) - loader.dont_prepare_nor_finish = True - - threads[current_name] = loader - threads[current_name].start() - running += 1 + try: + summary[current] = [] + current_name = "%s[%d]" % (self.name, current) + + loader = PGLoader(self.name, self.config, sem, + summary[current], current_name) + + loader.section_threads = 1 + loader.reader.set_boundaries(boundaries[current]) + loader.dont_prepare_nor_finish = True + + threads[current_name] = loader + threads[current_name].start() + running += 1 + + except Exception, e: + raise # wait for loaders completion while running > 0: - for cn in threads: - if not threads[cn].isAlive(): - running -= 1 + try: + for cn in threads: + if not threads[cn].isAlive(): + running -= 1 - if running > 0: - log.info('waiting for %d threads, sleeping %gs' % (running, 1)) - time.sleep(1) + if running > 0: + self.log.info('waiting for %d threads, sleeping %gs' % (running, 1)) + time.sleep(1) + + except KeyboardInterrupt: + self.log.warning("Aborting %d threads in section %s "\ + % (running, self.name)) + break self.finish_processing() - log.info('No more threads are running, %s done' % self.name) - return + self.log.info('No more threads are running, %s done' % self.name) else: # here we need a special thread reading the file pass + self.sem.release() + self.log.info("%s released" % self.name) + return + def prepare_processing(self): """ Things to do before processing data """ if 'dont_prepare_nor_finish' in self.__dict__: