mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-08 15:27:00 +02:00
Add support for --max-parallel-sections and --section-threads from command line.
This commit is contained in:
parent
b7b0bbc62d
commit
ac0a16f9b2
3
.gitignore
vendored
3
.gitignore
vendored
@ -10,3 +10,6 @@
|
|||||||
/pgloader/textreader.pyc
|
/pgloader/textreader.pyc
|
||||||
/pgloader/tools.pyc
|
/pgloader/tools.pyc
|
||||||
/pgloader.1.xml
|
/pgloader.1.xml
|
||||||
|
/pgloader/RRRtools.pyc
|
||||||
|
/reformat/mysql.pyc
|
||||||
|
/reformat/pgtime.pyc
|
||||||
|
@ -166,6 +166,17 @@ You can't use both -F and -I at the same time.
|
|||||||
|
|
||||||
Input data files encoding. Defaults to 'latin9'.
|
Input data files encoding. Defaults to 'latin9'.
|
||||||
|
|
||||||
|
-t, --section-threads::
|
||||||
|
|
||||||
|
How many threads per section to use, defaults to 1. The command line
|
||||||
|
value override the configuration file one.
|
||||||
|
|
||||||
|
-m, --max-parallel-sections::
|
||||||
|
|
||||||
|
How many sections to load in parallel, defaults to 1. The command line
|
||||||
|
value override the configuration file one. That's a max value because
|
||||||
|
you will end up having less sections to load than this number.
|
||||||
|
|
||||||
-R, --reformat_path::
|
-R, --reformat_path::
|
||||||
|
|
||||||
PATH where to find reformat python modules, defaults to
|
PATH where to find reformat python modules, defaults to
|
||||||
|
21
pgloader.py
21
pgloader.py
@ -103,6 +103,17 @@ def parse_options():
|
|||||||
default = None,
|
default = None,
|
||||||
help = "input files encoding")
|
help = "input files encoding")
|
||||||
|
|
||||||
|
parser.add_option("-t", "--section-threads", dest = "section_threads",
|
||||||
|
default = pgloader.options.SECTION_THREADS,
|
||||||
|
type = "int",
|
||||||
|
help = "number of threads used per sections, default is 1")
|
||||||
|
|
||||||
|
parser.add_option("-m", "--max-parallel-sections", dest = "parallel",
|
||||||
|
default = pgloader.options.MAX_PARALLEL_SECTIONS,
|
||||||
|
type = "int",
|
||||||
|
help = "number of sections to load in parralel, " +\
|
||||||
|
"default is 1")
|
||||||
|
|
||||||
parser.add_option("-R", "--reformat_path", dest = "reformat_path",
|
parser.add_option("-R", "--reformat_path", dest = "reformat_path",
|
||||||
default = None,
|
default = None,
|
||||||
help = "PATH where to find reformat python modules")
|
help = "PATH where to find reformat python modules")
|
||||||
@ -189,6 +200,9 @@ def parse_options():
|
|||||||
pgloader.options.FROM_ID = opts.fromid
|
pgloader.options.FROM_ID = opts.fromid
|
||||||
pgloader.options.FIELD_SEP = opts.fsep
|
pgloader.options.FIELD_SEP = opts.fsep
|
||||||
|
|
||||||
|
pgloader.options.SECTION_THREADS = opts.section_threads
|
||||||
|
pgloader.options.MAX_PARALLEL_SECTIONS = opts.parallel
|
||||||
|
|
||||||
pgloader.options.INPUT_ENCODING = opts.encoding
|
pgloader.options.INPUT_ENCODING = opts.encoding
|
||||||
|
|
||||||
if opts.reformat_path:
|
if opts.reformat_path:
|
||||||
@ -320,8 +334,9 @@ def parse_config(conffile):
|
|||||||
pgloader.options.REFORMAT_PATH = rpath
|
pgloader.options.REFORMAT_PATH = rpath
|
||||||
|
|
||||||
if config.has_option(section, 'max_parallel_sections'):
|
if config.has_option(section, 'max_parallel_sections'):
|
||||||
mps = config.getint(section, 'max_parallel_sections')
|
if not pgloader.options.MAX_PARALLEL_SECTIONS:
|
||||||
pgloader.options.MAX_PARALLEL_SECTIONS = mps
|
mps = config.getint(section, 'max_parallel_sections')
|
||||||
|
pgloader.options.MAX_PARALLEL_SECTIONS = mps
|
||||||
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
@ -536,6 +551,8 @@ def load_data():
|
|||||||
if max_running == -1:
|
if max_running == -1:
|
||||||
max_running = len(sections)
|
max_running = len(sections)
|
||||||
|
|
||||||
|
log.info('Will load %d section at a time' % max_running)
|
||||||
|
|
||||||
sem = threading.BoundedSemaphore(max_running)
|
sem = threading.BoundedSemaphore(max_running)
|
||||||
|
|
||||||
while current < len(sections):
|
while current < len(sections):
|
||||||
|
@ -38,8 +38,10 @@ UDC_PREFIX = 'udc_'
|
|||||||
REFORMAT_PATH = None
|
REFORMAT_PATH = None
|
||||||
DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat']
|
DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat']
|
||||||
|
|
||||||
MAX_PARALLEL_SECTIONS = 1
|
DEFAULT_MAX_PARALLEL_SECTIONS = 1
|
||||||
SECTION_THREADS = 1
|
DEFAULT_SECTION_THREADS = 1
|
||||||
|
MAX_PARALLEL_SECTIONS = None
|
||||||
|
SECTION_THREADS = None
|
||||||
SPLIT_FILE_READING = False
|
SPLIT_FILE_READING = False
|
||||||
RRQUEUE_SIZE = None
|
RRQUEUE_SIZE = None
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ from options import NEWLINE_ESCAPES
|
|||||||
from options import UDC_PREFIX
|
from options import UDC_PREFIX
|
||||||
from options import REFORMAT_PATH
|
from options import REFORMAT_PATH
|
||||||
from options import MAX_PARALLEL_SECTIONS
|
from options import MAX_PARALLEL_SECTIONS
|
||||||
from options import SECTION_THREADS, SPLIT_FILE_READING
|
from options import DEFAULT_SECTION_THREADS, SECTION_THREADS, SPLIT_FILE_READING
|
||||||
from options import RRQUEUE_SIZE
|
from options import RRQUEUE_SIZE
|
||||||
|
|
||||||
class PGLoader(threading.Thread):
|
class PGLoader(threading.Thread):
|
||||||
@ -529,11 +529,17 @@ class PGLoader(threading.Thread):
|
|||||||
self.log.debug("self.newline_escapes = '%s'" % self.newline_escapes)
|
self.log.debug("self.newline_escapes = '%s'" % self.newline_escapes)
|
||||||
|
|
||||||
##
|
##
|
||||||
# Parallelism knobs
|
# Parallelism knobs, give preference to command line
|
||||||
if config.has_option(name, 'section_threads'):
|
if SECTION_THREADS:
|
||||||
|
self.section_threads = SECTION_THREADS
|
||||||
|
elif config.has_option(name, 'section_threads'):
|
||||||
self.section_threads = config.getint(name, 'section_threads')
|
self.section_threads = config.getint(name, 'section_threads')
|
||||||
else:
|
else:
|
||||||
self.section_threads = SECTION_THREADS
|
self.section_threads = DEFAULT_SECTION_THREADS
|
||||||
|
|
||||||
|
if not self.template:
|
||||||
|
# only log the definitive information
|
||||||
|
self.log.info("Loading threads: %d" % self.section_threads)
|
||||||
|
|
||||||
if config.has_option(name, 'split_file_reading'):
|
if config.has_option(name, 'split_file_reading'):
|
||||||
self.split_file_reading = config.get(name, 'split_file_reading') == 'True'
|
self.split_file_reading = config.get(name, 'split_file_reading') == 'True'
|
||||||
|
Loading…
Reference in New Issue
Block a user