From ac0a16f9b2526a29e7049f7a72bf79c06ad345db Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Sun, 4 Apr 2010 21:57:00 +0200 Subject: [PATCH] Add support for --max-parallel-sections and --section-threads from command line. --- .gitignore | 3 +++ pgloader.1.txt | 11 +++++++++++ pgloader.py | 21 +++++++++++++++++++-- pgloader/options.py | 6 ++++-- pgloader/pgloader.py | 14 ++++++++++---- 5 files changed, 47 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index d08adb9..10d5c27 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ /pgloader/textreader.pyc /pgloader/tools.pyc /pgloader.1.xml +/pgloader/RRRtools.pyc +/reformat/mysql.pyc +/reformat/pgtime.pyc diff --git a/pgloader.1.txt b/pgloader.1.txt index fabd54e..25140b2 100644 --- a/pgloader.1.txt +++ b/pgloader.1.txt @@ -166,6 +166,17 @@ You can't use both -F and -I at the same time. Input data files encoding. Defaults to 'latin9'. +-t, --section-threads:: + + How many threads per section to use, defaults to 1. The command line + value override the configuration file one. + +-m, --max-parallel-sections:: + + How many sections to load in parallel, defaults to 1. The command line + value override the configuration file one. That's a max value because + you will end up having less sections to load than this number. + -R, --reformat_path:: PATH where to find reformat python modules, defaults to diff --git a/pgloader.py b/pgloader.py index cb4af3e..f62b9fd 100755 --- a/pgloader.py +++ b/pgloader.py @@ -103,6 +103,17 @@ def parse_options(): default = None, help = "input files encoding") + parser.add_option("-t", "--section-threads", dest = "section_threads", + default = pgloader.options.SECTION_THREADS, + type = "int", + help = "number of threads used per sections, default is 1") + + parser.add_option("-m", "--max-parallel-sections", dest = "parallel", + default = pgloader.options.MAX_PARALLEL_SECTIONS, + type = "int", + help = "number of sections to load in parralel, " +\ + "default is 1") + parser.add_option("-R", "--reformat_path", dest = "reformat_path", default = None, help = "PATH where to find reformat python modules") @@ -189,6 +200,9 @@ def parse_options(): pgloader.options.FROM_ID = opts.fromid pgloader.options.FIELD_SEP = opts.fsep + pgloader.options.SECTION_THREADS = opts.section_threads + pgloader.options.MAX_PARALLEL_SECTIONS = opts.parallel + pgloader.options.INPUT_ENCODING = opts.encoding if opts.reformat_path: @@ -320,8 +334,9 @@ def parse_config(conffile): pgloader.options.REFORMAT_PATH = rpath if config.has_option(section, 'max_parallel_sections'): - mps = config.getint(section, 'max_parallel_sections') - pgloader.options.MAX_PARALLEL_SECTIONS = mps + if not pgloader.options.MAX_PARALLEL_SECTIONS: + mps = config.getint(section, 'max_parallel_sections') + pgloader.options.MAX_PARALLEL_SECTIONS = mps return config @@ -536,6 +551,8 @@ def load_data(): if max_running == -1: max_running = len(sections) + log.info('Will load %d section at a time' % max_running) + sem = threading.BoundedSemaphore(max_running) while current < len(sections): diff --git a/pgloader/options.py b/pgloader/options.py index 62193a4..a1356f0 100644 --- a/pgloader/options.py +++ b/pgloader/options.py @@ -38,8 +38,10 @@ UDC_PREFIX = 'udc_' REFORMAT_PATH = None DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat'] -MAX_PARALLEL_SECTIONS = 1 -SECTION_THREADS = 1 +DEFAULT_MAX_PARALLEL_SECTIONS = 1 +DEFAULT_SECTION_THREADS = 1 +MAX_PARALLEL_SECTIONS = None +SECTION_THREADS = None SPLIT_FILE_READING = False RRQUEUE_SIZE = None diff --git a/pgloader/pgloader.py b/pgloader/pgloader.py index fb96918..1207c86 100644 --- a/pgloader/pgloader.py +++ b/pgloader/pgloader.py @@ -22,7 +22,7 @@ from options import NEWLINE_ESCAPES from options import UDC_PREFIX from options import REFORMAT_PATH from options import MAX_PARALLEL_SECTIONS -from options import SECTION_THREADS, SPLIT_FILE_READING +from options import DEFAULT_SECTION_THREADS, SECTION_THREADS, SPLIT_FILE_READING from options import RRQUEUE_SIZE class PGLoader(threading.Thread): @@ -529,11 +529,17 @@ class PGLoader(threading.Thread): self.log.debug("self.newline_escapes = '%s'" % self.newline_escapes) ## - # Parallelism knobs - if config.has_option(name, 'section_threads'): + # Parallelism knobs, give preference to command line + if SECTION_THREADS: + self.section_threads = SECTION_THREADS + elif config.has_option(name, 'section_threads'): self.section_threads = config.getint(name, 'section_threads') else: - self.section_threads = SECTION_THREADS + self.section_threads = DEFAULT_SECTION_THREADS + + if not self.template: + # only log the definitive information + self.log.info("Loading threads: %d" % self.section_threads) if config.has_option(name, 'split_file_reading'): self.split_file_reading = config.get(name, 'split_file_reading') == 'True'