From bac5d2b4821f72000792d50811f2e77e072ef373 Mon Sep 17 00:00:00 2001 From: dim Date: Tue, 12 Feb 2008 21:58:42 +0000 Subject: [PATCH] First implementation of Round Robin Reader threading method, Work In Progress --- does not work at all yet --- examples/parallel/parallel.sql | 2 +- pgloader/options.py | 1 + pgloader/pgloader.py | 350 +++++++++++++++++++++++---------- pgloader/reader.py | 3 + pgloader/tools.py | 15 +- 5 files changed, 264 insertions(+), 107 deletions(-) diff --git a/examples/parallel/parallel.sql b/examples/parallel/parallel.sql index 6726b1f..0528b62 100644 --- a/examples/parallel/parallel.sql +++ b/examples/parallel/parallel.sql @@ -6,6 +6,6 @@ CREATE TABLE parallel ( -- create the .data file insert into parallel select * from (select a, a::text - from generate_series(0, 1000 * 1000 * 1000) as t(a)) x; + from generate_series(0, 1000 * 1000) as t(a)) x; \copy parallel to 'parallel/parallel.data' with delimiter ';' csv diff --git a/pgloader/options.py b/pgloader/options.py index f8cbd16..6e0f645 100644 --- a/pgloader/options.py +++ b/pgloader/options.py @@ -39,6 +39,7 @@ DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat'] MAX_PARALLEL_SECTIONS = 1 SECTION_THREADS = 1 SPLIT_FILE_READING = False +RRQUEUE_SIZE = None CLIENT_MIN_MESSAGES = None LOG_MIN_MESSAGES = DEBUG diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index ea50596..8a2434f 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -23,6 +23,7 @@ from options import UDC_PREFIX from options import REFORMAT_PATH from options import MAX_PARALLEL_SECTIONS from options import SECTION_THREADS, SPLIT_FILE_READING +from options import RRQUEUE_SIZE class PGLoader(threading.Thread): """ @@ -30,20 +31,28 @@ class PGLoader(threading.Thread): import data with COPY or update blob data with UPDATE. """ - def __init__(self, name, config, sem, stats, logname = None): + def __init__(self, name, config, sem, stats, + logname = None, queue = None, lock = None): """ Init with a configuration section """ threading.Thread.__init__(self, name = name) - # sem and stats are global objects: - # sem is shared by all threads at the same level, stats is a - # private entry of a shared dict + # sem, stats and queue (if not None) are global objects: + # sem is shared by all threads at the same level + # stats is a private entry of a shared dict + # queue is given when reading in round robin mode + # lock is a threading.Lock for reading sync + # self.sem = sem self.stats = stats + self.queue = queue + self.lock = lock # thereafter parameters are local self.name = name self.config = config + # logname is given when we use several Threads for reading + # input file if logname is None: logname = name self.log = getLogger(logname) @@ -470,13 +479,26 @@ class PGLoader(threading.Thread): self.log.debug('%s.%s = %s' % (name, opt, str(self.__dict__[opt]))) if not self.template and self.split_file_reading: + opt = 'split_file_reading' if FROM_COUNT is not None and FROM_COUNT > 0: raise PGLoader_Error, \ - "Conflict: can't use both 'split_file_reading' and '--from'" + "Conflict: can't use both '%s' and '--from'" % opt if FROM_ID is not None: raise PGLoader_Error, \ - "Conflict: can't use both 'split_file_reading' and '--from-id'" + "Conflict: can't use both '%s' and '--from-id'" % opt + + if not self.template \ + and self.format.lower() == 'text' \ + and self.field_count is not None: + + # this option is not compatible with text mode when + # field_count is used (meaning end of line could be found + # in the data) + + raise PGLoader_Error, \ + "Can't use split_file_reading with text " +\ + "format when 'field_count' is used" ## # Reader's init @@ -694,7 +716,8 @@ class PGLoader(threading.Thread): self.sem.acquire() # Announce the beginning of the work - self.log.info("%s launched" % self.name) + self.log.info("%s launched --- %d threads" % (self.name, + self.section_threads)) if self.section_threads == 1: if self.reader.start is not None: @@ -711,109 +734,225 @@ class PGLoader(threading.Thread): return if self.split_file_reading: - # this option is not compatible with text mode when - # field_count is used (meaning end of line could be found - # in the data) - if self.format.lower() == 'text' and self.field_count is not None: - raise PGLoader_Error, \ - "Can't use split_file_reading with text " +\ - "format when 'field_count' is used" - - # init boundaries to give to each thread - from stat import ST_SIZE - previous = 0 - filesize = os.stat(self.filename)[ST_SIZE] - boundaries = [] - for partn in range(self.section_threads): - start = previous - end = (partn+1)*filesize / self.section_threads - boundaries.append((start, end)) - - previous = end + 1 - - self.log.debug("Spliting input file of %d bytes %s" \ - % (filesize, str(boundaries))) - - # Now check for real boundaries - fd = file(self.filename) - b = 0 - for b in range(len(boundaries)): - start, end = boundaries[b] - fd.seek(end) - dummy_str = fd.readline() - - # update both current boundary end and next start - boundaries[b] = (start, fd.tell()-1) - if (b+1) < len(boundaries): - boundaries[b+1] = (fd.tell(), boundaries[b+1][1]) - - fd.close() - - self.log.info("Spliting input file of %d bytes %s" \ - % (filesize, str(boundaries))) - - self.prepare_processing() - - # now create self.section_threads PGLoader threads - sem = threading.BoundedSemaphore(self.section_threads) - summary = {} - threads = {} - running = 0 - - for current in range(self.section_threads): - try: - summary[current] = [] - current_name = "%s[%d]" % (self.name, current) - - loader = PGLoader(self.name, self.config, sem, - summary[current], current_name) - - loader.section_threads = 1 - loader.reader.set_boundaries(boundaries[current]) - loader.dont_prepare_nor_finish = True - - threads[current_name] = loader - threads[current_name].start() - running += 1 - - except Exception, e: - raise - - # wait for loaders completion, first let them some time to - # be started - time.sleep(2) - - from tools import running_threads - n = running_threads(threads) - log.info("Waiting for %d threads to terminate" % n) - - # Try to acquire all semaphore entries - for i in range(self.section_threads): - sem.acquire() - log.info("Acquired %d times, " % (i+1) + \ - "still waiting for %d threads to terminate" \ - % running_threads(threads)) - - self.finish_processing() - self.duration = time.time() - self.init_time - self.log.info('No more threads are running, %s done' % self.name) - - stats = [0, 0] - for s in summary: - for i in range(2, len(summary[s])): - stats[i-2] += summary[s][i] - - for x in [self.table, self.duration] + stats: - self.stats.append(x) + # start self.section_threads workers + self.split_file_read() else: # here we need a special thread reading the file - pass + self.round_robin_read() self.sem.release() self.log.info("%s released" % self.name) return + def split_file_read(self): + """ Current thread will start self.section_threads threads, + each one reading a part of the input file. """ + + # init boundaries to give to each thread + from stat import ST_SIZE + previous = 0 + filesize = os.stat(self.filename)[ST_SIZE] + boundaries = [] + for partn in range(self.section_threads): + start = previous + end = (partn+1)*filesize / self.section_threads + boundaries.append((start, end)) + + previous = end + 1 + + self.log.debug("Spliting input file of %d bytes %s" \ + % (filesize, str(boundaries))) + + # Now check for real boundaries + fd = file(self.filename) + b = 0 + for b in range(len(boundaries)): + start, end = boundaries[b] + fd.seek(end) + dummy_str = fd.readline() + + # update both current boundary end and next start + boundaries[b] = (start, fd.tell()-1) + if (b+1) < len(boundaries): + boundaries[b+1] = (fd.tell(), boundaries[b+1][1]) + + fd.close() + + self.log.info("Spliting input file of %d bytes %s" \ + % (filesize, str(boundaries))) + + self.prepare_processing() + + # now create self.section_threads PGLoader threads + sem = threading.BoundedSemaphore(self.section_threads) + summary = {} + threads = {} + + for current in range(self.section_threads): + try: + summary[current] = [] + current_name = "%s[%d]" % (self.name, current) + + loader = PGLoader(self.name, self.config, sem, + summary[current], current_name) + + loader.section_threads = 1 + loader.reader.set_boundaries(boundaries[current]) + loader.dont_prepare_nor_finish = True + + threads[current_name] = loader + threads[current_name].start() + + except Exception, e: + raise + + # wait for loaders completion, first let them some time to + # be started + time.sleep(2) + + from tools import running_threads + n = running_threads(threads) + log.info("Waiting for %d threads to terminate" % n) + + # Try to acquire all semaphore entries + for i in range(self.section_threads): + sem.acquire() + log.info("Acquired %d times, " % (i+1) + \ + "still waiting for %d threads to terminate" \ + % running_threads(threads)) + + self.finish_processing() + self.duration = time.time() - self.init_time + self.log.info('No more threads are running, %s done' % self.name) + + stats = [0, 0] + for s in summary: + for i in range(2, len(summary[s])): + stats[i-2] += summary[s][i] + + for x in [self.table, self.duration] + stats: + self.stats.append(x) + + return + + def round_robin_read(self): + """ Start self.section_threads threads to process data, this + thread will read the input file and distribute the processing + on a round-robin fashion""" + self.prepare_processing() + + from tools import RRReader + queues = {} + locks = {} + sem = threading.BoundedSemaphore(self.section_threads) + summary = {} + threads = {} + + rrqueue_size = RRQUEUE_SIZE + if rrqueue_size is None: + rrqueue_size = self.db.copy_every + + for current in range(self.section_threads): + queues[current] = RRReader() + locks [current] = threading.Lock() + + # acquire the lock before starting worker thread + # and release it once its queue if full + self.log.info("locks[%d].acquire" % current) + locks[current].acquire() + + try: + summary[current] = [] + current_name = "%s[%d]" % (self.name, current) + + loader = PGLoader(self.name, self.config, sem, + summary[current], + logname = current_name, + queue = queues[current], + lock = locks [current]) + + loader.section_threads = 1 + loader.dont_prepare_nor_finish = True + loader.done = False + + threads[current_name] = loader + threads[current_name].start() + + except Exception, e: + raise + + # wait for loaders completion, first let them some time to + # be started + time.sleep(2) + + # Now self.section_threads are started and we have a queue and + # a Condition for each of them. + # + # read the input file here, and give each worker Thread is + # share to process, in a round-robin fashion + n = 0 # line number + c = 0 # current + p = c # previous + + for line, columns in self.reader.readlines(): + if p != c: + self.log.info("read %d lines, queue to thread %s" % (n, c)) + + # release p'thread (which will empty its queue) and + # lock c'thread --- waiting until it has emptied its + # queue + self.log.info("locks[%d].release" % p) + self.log.info("locks[%d].acquire" % c) + locks[p].release() + locks[c].acquire() + + queues[c].append((line, columns)) + n += 1 + p = c + c = (n / rrqueue_size) % self.section_threads + + # don't forget to process last run + locks[c].release() + + k = threads.keys() + for c in range(self.section_threads): + threads[k[c]].acquire() + threads[k[c]].done = True + threads[k[c]].release() + + self.finish_processing() + self.duration = time.time() - self.init_time + self.log.info('%s done' % self.name) + + stats = [0, 0] + for s in summary: + for i in range(2, len(summary[s])): + stats[i-2] += summary[s][i] + + for x in [self.table, self.duration] + stats: + self.stats.append(x) + + return + + def readlines(self): + """ return next line from either self.queue or self.reader """ + + if self.queue is None: + for line, columns in self.reader.readlines(): + yield line, columns + + return + + while not self.done: + self.lock.acquire() + for line, columns in self.queue.readlines(): + yield line, columns + + self.lock.release() + + return + def prepare_processing(self): """ Things to do before processing data """ if 'dont_prepare_nor_finish' in self.__dict__: @@ -844,7 +983,8 @@ class PGLoader(threading.Thread): if self.reject is not None: self.errors = self.reject.errors - for x in [self.table, self.duration, self.db.commited_rows, self.errors]: + for x in [self.table, self.duration, + self.db.commited_rows, self.errors]: self.stats.append(x) # then show up some stats @@ -873,8 +1013,8 @@ class PGLoader(threading.Thread): if self.udcs: dudcs = dict(self.udcs) - - for line, columns in self.reader.readlines(): + + for line, columns in self.readlines(): if self.blob_cols is not None: columns, rowids = self.read_blob(line, columns) diff --git a/pgloader/reader.py b/pgloader/reader.py index 8dfe17c..270bb24 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -37,6 +37,8 @@ class DataReader: if INPUT_ENCODING is not None: self.input_encoding = INPUT_ENCODING + # (start, end) are used for split_file_reading mode + # queue when in round_robin_read mode self.start = None self.end = None @@ -92,3 +94,4 @@ class DataReader: """ set the boundaries of this reader """ self.start = start self.end = end + diff --git a/pgloader/tools.py b/pgloader/tools.py index 6d786d0..dde2736 100644 --- a/pgloader/tools.py +++ b/pgloader/tools.py @@ -2,7 +2,7 @@ # # pgloader librairies -import os, sys, os.path, time, codecs +import os, sys, os.path, time, codecs, collections from cStringIO import StringIO from options import DRY_RUN, PEDANTIC @@ -176,3 +176,16 @@ def running_threads(threads): running += 1 return running + + +class RRReader(collections.deque): + """ Round Robin reader, which are collections.deque with a + readlines() method""" + + def readlines(self): + """ return next line from queue """ + while 1: + try: + yield self.popleft() + except IndexError: + return