From fc8adf1831bd2b9e2ee1b75864d10351c62b959d Mon Sep 17 00:00:00 2001 From: dim Date: Mon, 11 Feb 2008 17:38:52 +0000 Subject: [PATCH] First implementation of parallel loading, each thread loading a part of the input file (stat, seek, etc) --- debian/changelog | 8 ++ examples/parallel/parallel.sql | 11 ++ examples/pgloader.conf | 25 ++++- pgloader.py | 43 ++++++-- pgloader/csvreader.py | 10 ++ pgloader/options.py | 7 +- pgloader/pgloader.py | 177 ++++++++++++++++++++++++++++++--- pgloader/reader.py | 8 ++ pgloader/textreader.py | 9 ++ 9 files changed, 278 insertions(+), 20 deletions(-) create mode 100644 examples/parallel/parallel.sql diff --git a/debian/changelog b/debian/changelog index 33daa69..5480556 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,11 @@ +pgloader (2.3.0-1) unstable; urgency=low + + * Mutli-threaded pgloader (see options max_threads, max_parallel_sections, + section_threads and split_file_reading) + * FIX for -C and -I options (replace sys.log with self.log), per user request + + -- Dimitri Fontaine Mon, 11 Feb 2008 15:04:40 +0100 + pgloader (2.2.6-1) unstable; urgency=low * pgloader -V now VACUUM each table separately, no more vacuumdb issued diff --git a/examples/parallel/parallel.sql b/examples/parallel/parallel.sql new file mode 100644 index 0000000..6726b1f --- /dev/null +++ b/examples/parallel/parallel.sql @@ -0,0 +1,11 @@ +CREATE TABLE parallel ( + a integer primary key, + b text +); + +-- create the .data file +insert into parallel + select * from (select a, a::text + from generate_series(0, 1000 * 1000 * 1000) as t(a)) x; + +\copy parallel to 'parallel/parallel.data' with delimiter ';' csv diff --git a/examples/pgloader.conf b/examples/pgloader.conf index c8d50f5..1beccfd 100644 --- a/examples/pgloader.conf +++ b/examples/pgloader.conf @@ -12,13 +12,16 @@ lc_messages = C ;client_encoding = 'utf-8' client_encoding = 'latin1' -copy_every = 5 +copy_every = 5000 commit_every = 5 #copy_delimiter = % null = "" empty_string = "\ " +max_parallel_sections = 1 + + [simple_tmpl] template = True format = text @@ -96,6 +99,26 @@ field_sep = | columns = id, timestamp reformat = timestamp:mysql:timestamp +[parallel_template] +template = True +table = parallel +format = csv +filename = parallel/parallel.data +field_sep = ; +columns = a, b + +[parallel_greg] +use_template = parallel_template +max_threads = 4 +section_threads = -1 +split_file_reading = True + +[parallel_simon] +use_template = parallel_template +max_threads = 2 +section_threads = -1 +split_file_reading = False + [csv] table = csv format = csv diff --git a/pgloader.py b/pgloader.py index dc518c1..be7a48b 100644 --- a/pgloader.py +++ b/pgloader.py @@ -273,6 +273,10 @@ def parse_config(conffile): rpath = config.get(section, 'reformat_path') pgloader.options.REFORMAT_PATH = rpath + if config.has_option(section, 'max_parallel_sections'): + mps = config.getint(section, 'max_parallel_sections') + pgloader.options.MAX_PARALLEL_SECTIONS = mps + return config def myprint(l, line_prefix = " ", cols = 78): @@ -425,6 +429,7 @@ def load_data(): # load some pgloader package modules from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY from pgloader.options import DRY_RUN, PEDANTIC, VACUUM + from pgloader.options import MAX_PARALLEL_SECTIONS from pgloader.pgloader import PGLoader from pgloader.tools import PGLoader_Error @@ -454,10 +459,32 @@ def load_data(): threads = {} running = 0 - for s in sections: - summary[s] = [] - loader = PGLoader(s, config, summary[s]) + current = 0 + + while current < len(sections): + s = sections[current] + + # update running + if running > 0: + for s in threads: + if not threads[s].isAlive(): + running -= 1 + + if MAX_PARALLEL_SECTIONS != -1: + # -1 means we can start as many parallel section + # processing as we want to + + if running == MAX_PARALLEL_SECTIONS: + # we have to wait for one thread to terminate + # before considering next one + log.info('%d/%d threads running, sleeping %gs' \ + % (running, MAX_PARALLEL_SECTIONS, .1)) + time.sleep(.1) + continue + try: + summary[s] = [] + loader = PGLoader(s, config, summary[s]) if not loader.template: filename = loader.filename input_encoding = loader.input_encoding @@ -489,17 +516,21 @@ def load_data(): except KeyboardInterrupt: log.warning("Aborting on user demand (Interrupt)") + current += 1 + while running > 0: for s in threads: if not threads[s].isAlive(): running -= 1 if running > 0: - log.info("%d thread(s) still running" % running) + if MAX_PARALLEL_SECTIONS != 1: + log.info("%d thread(s) still running" % running) try: - log.info('waiting for %d threads, sleeping %gs' % (running, .1)) - time.sleep(.1) + if MAX_PARALLEL_SECTIONS != 1: + log.info('waiting for %d threads, sleeping %gs' % (running, 1)) + time.sleep(1) except KeyboardInterrupt: log.warning("Aborting %d threads still running at user demand"\ % running) diff --git a/pgloader/csvreader.py b/pgloader/csvreader.py index c2fcd6c..07953f3 100644 --- a/pgloader/csvreader.py +++ b/pgloader/csvreader.py @@ -76,7 +76,17 @@ class CSVReader(DataReader): except IOError, error: raise PGLoader_Error, error + if self.start is not None and self.start > 0: + self.log.info("CSV Reader starting at offset %d" % self.start) + fd.seek(self.start) + # now read the lines for columns in csv.reader(fd, dialect = 'pgloader'): + + if self.end is not None and fd.tell() >= self.end: + self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end())) + fd.close() + break + line = self.field_sep.join(columns) yield line, columns diff --git a/pgloader/options.py b/pgloader/options.py index f2bd008..514d93d 100644 --- a/pgloader/options.py +++ b/pgloader/options.py @@ -2,7 +2,7 @@ # # Some common options, for each module to get them -PGLOADER_VERSION = '2.2.6' +PGLOADER_VERSION = '2.3.0~dev' INPUT_ENCODING = None PG_CLIENT_ENCODING = 'latin9' @@ -36,6 +36,11 @@ UDC_PREFIX = 'udc_' REFORMAT_PATH = None DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat'] +MAX_THREADS = 1 +MAX_PARALLEL_SECTIONS = 1 +SECTION_THREADS = -1 +SPLIT_FILE_READING = False + CLIENT_MIN_MESSAGES = None LOG_MIN_MESSAGES = DEBUG DEFAULT_LOG_FILE = "/tmp/pgloader.log" diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index 7282737..4d0d30f 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -21,6 +21,8 @@ from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING from options import NEWLINE_ESCAPES from options import UDC_PREFIX from options import REFORMAT_PATH +from options import MAX_THREADS, MAX_PARALLEL_SECTIONS +from options import SECTION_THREADS, SPLIT_FILE_READING class PGLoader(threading.Thread): """ @@ -28,14 +30,18 @@ class PGLoader(threading.Thread): import data with COPY or update blob data with UPDATE. """ - def __init__(self, name, config, stats): + def __init__(self, name, config, stats, logname = None): """ Init with a configuration section """ threading.Thread.__init__(self, name = name) # Some settings self.stats = stats self.name = name - self.log = getLogger(name) + self.config = config + + if logname is None: + logname = name + self.log = getLogger(logname) self.__dbconnect__(config) @@ -437,8 +443,44 @@ class PGLoader(threading.Thread): if NEWLINE_ESCAPES is not None: # set NEWLINE_ESCAPES for each table column self.newline_escapes = [(a, NEWLINE_ESCAPES) - for (a, x) in self.columns] + for (a, x) in self.columns] + ## + # Parallelism knobs + for opt, default in [('max_threads', MAX_THREADS), + ('section_threads', SECTION_THREADS), + ('split_file_reading', SPLIT_FILE_READING)]: + + if config.has_option(name, opt): + if opt in ['max_threads', 'section_threads']: + self.__dict__[opt] = config.getint(name, opt) + else: + self.__dict__[opt] = config.get(name, opt) == 'True' + else: + if not self.template: + self.__dict__[opt] = default + + if not self.template: + self.log.info('%s.%s = %s' % (name, opt, str(self.__dict__[opt]))) + + if not self.template and self.split_file_reading: + global FROM_COUNT + if FROM_COUNT is not None and FROM_COUNT > 0: + raise PGLoader_Error, \ + "Conflict: can't use both 'split_file_reading' and '--from'" + + global FROM_ID + if FROM_ID is not None: + raise PGLoader_Error, \ + "Conflict: can't use both 'split_file_reading' and '--from-id'" + + if not self.template and self.section_threads > self.max_threads: + raise PGLoader_Error, \ + "%s.section_threads > %s.max_threads : %d > %d" \ + % (name, name, self.section_threads, self.max_threads) + + ## + # Reader's init if config.has_option(name, 'format'): self.format = config.get(name, 'format') @@ -648,11 +690,113 @@ class PGLoader(threading.Thread): return def run(self): - """ depending on configuration, do given job """ - + """ controling thread which dispatch the job """ + # Announce the beginning of the work self.log.info("[%s]" % self.name) + if self.max_threads == 1: + + if self.reader.start is not None: + self.log.info("Loading from offset %d to %d" \ + % (self.reader.start, self.reader.end)) + + self.prepare_processing() + self.process() + self.finish_processing() + return + + # now we're going to need mutli-threading + if self.section_threads == -1: + self.section_threads = self.max_threads + + if self.split_file_reading: + # this option is not compatible with text mode when + # field_count is used (meaning end of line could be found + # in the data) + if self.format.lower() == 'text' and self.field_count is not None: + raise PGLoader_Error, \ + "Can't use split_file_reading with text " +\ + "format when 'field_count' is used" + + # init boundaries to give to each thread + from stat import ST_SIZE + previous = 0 + filesize = os.stat(self.filename)[ST_SIZE] + boundaries = [] + for partn in range(self.section_threads): + start = previous + end = (partn+1)*filesize / self.section_threads + boundaries.append((start, end)) + + previous = end + 1 + + self.log.info("Spliting input file of %d bytes %s" \ + % (filesize, str(boundaries))) + + # Now check for real boundaries + fd = file(self.filename) + b = 0 + for b in range(len(boundaries)): + start, end = boundaries[b] + fd.seek(end) + dummy_str = fd.readline() + + # update both current boundary end and next start + boundaries[b] = (start, fd.tell()) + if (b+1) < len(boundaries): + boundaries[b+1] = (fd.tell()+1, boundaries[b+1][1]) + + fd.close() + + self.log.info("Spliting input file of %d bytes %s" \ + % (filesize, str(boundaries))) + + self.prepare_processing() + + # now create self.section_threads PGLoader threads + summary = {} + threads = {} + running = 0 + + for current in range(self.section_threads): + summary[current] = [] + current_name = "%s[%d]" % (self.name, current) + loader = PGLoader(self.name, + self.config, + summary[current], + current_name) + loader.max_threads = 1 + loader.reader.set_boundaries(boundaries[current]) + loader.dont_prepare_nor_finish = True + + threads[current_name] = loader + threads[current_name].start() + running += 1 + + # wait for loaders completion + while running > 0: + for cn in threads: + if not threads[cn].isAlive(): + running -= 1 + + if running > 0: + log.info('waiting for %d threads, sleeping %gs' % (running, 1)) + time.sleep(1) + + self.finish_processing() + log.info('No more threads are running, %s done' % self.name) + return + + else: + # here we need a special thread reading the file + pass + + def prepare_processing(self): + """ Things to do before processing data """ + if 'dont_prepare_nor_finish' in self.__dict__: + return + if not DRY_RUN: if TRUNCATE: self.db.truncate(self.table) @@ -660,13 +804,10 @@ class PGLoader(threading.Thread): if TRIGGERS: self.db.disable_triggers(self.table) - if self.columns is not None: - self.log.info("COPY csv data") - self.data_import() - - elif self.blob_cols is not None: - # elif: COPY process also blob data - self.log.info("UPDATE blob data") + def finish_processing(self): + """ Things to do after processing data """ + if 'dont_prepare_nor_finish' in self.__dict__: + return if TRIGGERS and not DRY_RUN: self.db.enable_triggers(self.table) @@ -686,6 +827,18 @@ class PGLoader(threading.Thread): self.log.info("loading done") return + + def process(self): + """ depending on configuration, do given job """ + + if self.columns is not None: + self.log.info("COPY csv data") + self.data_import() + + elif self.blob_cols is not None: + # elif: COPY process also blob data + self.log.info("UPDATE blob data") + def data_import(self): """ import CSV or TEXT data, using COPY """ diff --git a/pgloader/reader.py b/pgloader/reader.py index 6581c66..8dfe17c 100644 --- a/pgloader/reader.py +++ b/pgloader/reader.py @@ -37,6 +37,9 @@ class DataReader: if INPUT_ENCODING is not None: self.input_encoding = INPUT_ENCODING + self.start = None + self.end = None + def readconfig(self, name, config): """ read configuration section for common options @@ -84,3 +87,8 @@ class DataReader: """ read data from configured file, and generate (yields) for each data line: line, columns and rowid """ pass + + def set_boundaries(self, (start, end)): + """ set the boundaries of this reader """ + self.start = start + self.end = end diff --git a/pgloader/textreader.py b/pgloader/textreader.py index c685427..cf92778 100644 --- a/pgloader/textreader.py +++ b/pgloader/textreader.py @@ -96,10 +96,19 @@ class TextReader(DataReader): except IOError, error: raise PGLoader_Error, error + if self.start is not None and self.start > 0: + self.log.info("Text Reader starting at offset %d" % self.start) + fd.seek(self.start) + for line in fd: # we count real physical lines nb_plines += 1 + if self.end is not None and fd.tell() >= self.end: + self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end())) + fd.close() + break + if self.input_encoding is not None: # this may not be necessary, after all try: