First implementation of parallel loading, each thread loading a part of the input file (stat, seek, etc)

This commit is contained in:
dim 2008-02-11 17:38:52 +00:00
parent 165daa19fe
commit fc8adf1831
9 changed files with 278 additions and 20 deletions

8
debian/changelog vendored
View File

@ -1,3 +1,11 @@
pgloader (2.3.0-1) unstable; urgency=low
* Mutli-threaded pgloader (see options max_threads, max_parallel_sections,
section_threads and split_file_reading)
* FIX for -C and -I options (replace sys.log with self.log), per user request
-- Dimitri Fontaine <dim@tapoueh.org> Mon, 11 Feb 2008 15:04:40 +0100
pgloader (2.2.6-1) unstable; urgency=low pgloader (2.2.6-1) unstable; urgency=low
* pgloader -V now VACUUM each table separately, no more vacuumdb issued * pgloader -V now VACUUM each table separately, no more vacuumdb issued

View File

@ -0,0 +1,11 @@
CREATE TABLE parallel (
a integer primary key,
b text
);
-- create the .data file
insert into parallel
select * from (select a, a::text
from generate_series(0, 1000 * 1000 * 1000) as t(a)) x;
\copy parallel to 'parallel/parallel.data' with delimiter ';' csv

View File

@ -12,13 +12,16 @@ lc_messages = C
;client_encoding = 'utf-8' ;client_encoding = 'utf-8'
client_encoding = 'latin1' client_encoding = 'latin1'
copy_every = 5 copy_every = 5000
commit_every = 5 commit_every = 5
#copy_delimiter = % #copy_delimiter = %
null = "" null = ""
empty_string = "\ " empty_string = "\ "
max_parallel_sections = 1
[simple_tmpl] [simple_tmpl]
template = True template = True
format = text format = text
@ -96,6 +99,26 @@ field_sep = |
columns = id, timestamp columns = id, timestamp
reformat = timestamp:mysql:timestamp reformat = timestamp:mysql:timestamp
[parallel_template]
template = True
table = parallel
format = csv
filename = parallel/parallel.data
field_sep = ;
columns = a, b
[parallel_greg]
use_template = parallel_template
max_threads = 4
section_threads = -1
split_file_reading = True
[parallel_simon]
use_template = parallel_template
max_threads = 2
section_threads = -1
split_file_reading = False
[csv] [csv]
table = csv table = csv
format = csv format = csv

View File

@ -273,6 +273,10 @@ def parse_config(conffile):
rpath = config.get(section, 'reformat_path') rpath = config.get(section, 'reformat_path')
pgloader.options.REFORMAT_PATH = rpath pgloader.options.REFORMAT_PATH = rpath
if config.has_option(section, 'max_parallel_sections'):
mps = config.getint(section, 'max_parallel_sections')
pgloader.options.MAX_PARALLEL_SECTIONS = mps
return config return config
def myprint(l, line_prefix = " ", cols = 78): def myprint(l, line_prefix = " ", cols = 78):
@ -425,6 +429,7 @@ def load_data():
# load some pgloader package modules # load some pgloader package modules
from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY
from pgloader.options import DRY_RUN, PEDANTIC, VACUUM from pgloader.options import DRY_RUN, PEDANTIC, VACUUM
from pgloader.options import MAX_PARALLEL_SECTIONS
from pgloader.pgloader import PGLoader from pgloader.pgloader import PGLoader
from pgloader.tools import PGLoader_Error from pgloader.tools import PGLoader_Error
@ -454,10 +459,32 @@ def load_data():
threads = {} threads = {}
running = 0 running = 0
for s in sections: current = 0
while current < len(sections):
s = sections[current]
# update running
if running > 0:
for s in threads:
if not threads[s].isAlive():
running -= 1
if MAX_PARALLEL_SECTIONS != -1:
# -1 means we can start as many parallel section
# processing as we want to
if running == MAX_PARALLEL_SECTIONS:
# we have to wait for one thread to terminate
# before considering next one
log.info('%d/%d threads running, sleeping %gs' \
% (running, MAX_PARALLEL_SECTIONS, .1))
time.sleep(.1)
continue
try:
summary[s] = [] summary[s] = []
loader = PGLoader(s, config, summary[s]) loader = PGLoader(s, config, summary[s])
try:
if not loader.template: if not loader.template:
filename = loader.filename filename = loader.filename
input_encoding = loader.input_encoding input_encoding = loader.input_encoding
@ -489,17 +516,21 @@ def load_data():
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning("Aborting on user demand (Interrupt)") log.warning("Aborting on user demand (Interrupt)")
current += 1
while running > 0: while running > 0:
for s in threads: for s in threads:
if not threads[s].isAlive(): if not threads[s].isAlive():
running -= 1 running -= 1
if running > 0: if running > 0:
if MAX_PARALLEL_SECTIONS != 1:
log.info("%d thread(s) still running" % running) log.info("%d thread(s) still running" % running)
try: try:
log.info('waiting for %d threads, sleeping %gs' % (running, .1)) if MAX_PARALLEL_SECTIONS != 1:
time.sleep(.1) log.info('waiting for %d threads, sleeping %gs' % (running, 1))
time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
log.warning("Aborting %d threads still running at user demand"\ log.warning("Aborting %d threads still running at user demand"\
% running) % running)

View File

@ -76,7 +76,17 @@ class CSVReader(DataReader):
except IOError, error: except IOError, error:
raise PGLoader_Error, error raise PGLoader_Error, error
if self.start is not None and self.start > 0:
self.log.info("CSV Reader starting at offset %d" % self.start)
fd.seek(self.start)
# now read the lines # now read the lines
for columns in csv.reader(fd, dialect = 'pgloader'): for columns in csv.reader(fd, dialect = 'pgloader'):
if self.end is not None and fd.tell() >= self.end:
self.log.info("CSV Reader stoping, offset %d >= %d" % (fd.tell(), self.end()))
fd.close()
break
line = self.field_sep.join(columns) line = self.field_sep.join(columns)
yield line, columns yield line, columns

View File

@ -2,7 +2,7 @@
# #
# Some common options, for each module to get them # Some common options, for each module to get them
PGLOADER_VERSION = '2.2.6' PGLOADER_VERSION = '2.3.0~dev'
INPUT_ENCODING = None INPUT_ENCODING = None
PG_CLIENT_ENCODING = 'latin9' PG_CLIENT_ENCODING = 'latin9'
@ -36,6 +36,11 @@ UDC_PREFIX = 'udc_'
REFORMAT_PATH = None REFORMAT_PATH = None
DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat'] DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat']
MAX_THREADS = 1
MAX_PARALLEL_SECTIONS = 1
SECTION_THREADS = -1
SPLIT_FILE_READING = False
CLIENT_MIN_MESSAGES = None CLIENT_MIN_MESSAGES = None
LOG_MIN_MESSAGES = DEBUG LOG_MIN_MESSAGES = DEBUG
DEFAULT_LOG_FILE = "/tmp/pgloader.log" DEFAULT_LOG_FILE = "/tmp/pgloader.log"

View File

@ -21,6 +21,8 @@ from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING
from options import NEWLINE_ESCAPES from options import NEWLINE_ESCAPES
from options import UDC_PREFIX from options import UDC_PREFIX
from options import REFORMAT_PATH from options import REFORMAT_PATH
from options import MAX_THREADS, MAX_PARALLEL_SECTIONS
from options import SECTION_THREADS, SPLIT_FILE_READING
class PGLoader(threading.Thread): class PGLoader(threading.Thread):
""" """
@ -28,14 +30,18 @@ class PGLoader(threading.Thread):
import data with COPY or update blob data with UPDATE. import data with COPY or update blob data with UPDATE.
""" """
def __init__(self, name, config, stats): def __init__(self, name, config, stats, logname = None):
""" Init with a configuration section """ """ Init with a configuration section """
threading.Thread.__init__(self, name = name) threading.Thread.__init__(self, name = name)
# Some settings # Some settings
self.stats = stats self.stats = stats
self.name = name self.name = name
self.log = getLogger(name) self.config = config
if logname is None:
logname = name
self.log = getLogger(logname)
self.__dbconnect__(config) self.__dbconnect__(config)
@ -439,6 +445,42 @@ class PGLoader(threading.Thread):
self.newline_escapes = [(a, NEWLINE_ESCAPES) self.newline_escapes = [(a, NEWLINE_ESCAPES)
for (a, x) in self.columns] for (a, x) in self.columns]
##
# Parallelism knobs
for opt, default in [('max_threads', MAX_THREADS),
('section_threads', SECTION_THREADS),
('split_file_reading', SPLIT_FILE_READING)]:
if config.has_option(name, opt):
if opt in ['max_threads', 'section_threads']:
self.__dict__[opt] = config.getint(name, opt)
else:
self.__dict__[opt] = config.get(name, opt) == 'True'
else:
if not self.template:
self.__dict__[opt] = default
if not self.template:
self.log.info('%s.%s = %s' % (name, opt, str(self.__dict__[opt])))
if not self.template and self.split_file_reading:
global FROM_COUNT
if FROM_COUNT is not None and FROM_COUNT > 0:
raise PGLoader_Error, \
"Conflict: can't use both 'split_file_reading' and '--from'"
global FROM_ID
if FROM_ID is not None:
raise PGLoader_Error, \
"Conflict: can't use both 'split_file_reading' and '--from-id'"
if not self.template and self.section_threads > self.max_threads:
raise PGLoader_Error, \
"%s.section_threads > %s.max_threads : %d > %d" \
% (name, name, self.section_threads, self.max_threads)
##
# Reader's init
if config.has_option(name, 'format'): if config.has_option(name, 'format'):
self.format = config.get(name, 'format') self.format = config.get(name, 'format')
@ -648,11 +690,113 @@ class PGLoader(threading.Thread):
return return
def run(self): def run(self):
""" depending on configuration, do given job """ """ controling thread which dispatch the job """
# Announce the beginning of the work # Announce the beginning of the work
self.log.info("[%s]" % self.name) self.log.info("[%s]" % self.name)
if self.max_threads == 1:
if self.reader.start is not None:
self.log.info("Loading from offset %d to %d" \
% (self.reader.start, self.reader.end))
self.prepare_processing()
self.process()
self.finish_processing()
return
# now we're going to need mutli-threading
if self.section_threads == -1:
self.section_threads = self.max_threads
if self.split_file_reading:
# this option is not compatible with text mode when
# field_count is used (meaning end of line could be found
# in the data)
if self.format.lower() == 'text' and self.field_count is not None:
raise PGLoader_Error, \
"Can't use split_file_reading with text " +\
"format when 'field_count' is used"
# init boundaries to give to each thread
from stat import ST_SIZE
previous = 0
filesize = os.stat(self.filename)[ST_SIZE]
boundaries = []
for partn in range(self.section_threads):
start = previous
end = (partn+1)*filesize / self.section_threads
boundaries.append((start, end))
previous = end + 1
self.log.info("Spliting input file of %d bytes %s" \
% (filesize, str(boundaries)))
# Now check for real boundaries
fd = file(self.filename)
b = 0
for b in range(len(boundaries)):
start, end = boundaries[b]
fd.seek(end)
dummy_str = fd.readline()
# update both current boundary end and next start
boundaries[b] = (start, fd.tell())
if (b+1) < len(boundaries):
boundaries[b+1] = (fd.tell()+1, boundaries[b+1][1])
fd.close()
self.log.info("Spliting input file of %d bytes %s" \
% (filesize, str(boundaries)))
self.prepare_processing()
# now create self.section_threads PGLoader threads
summary = {}
threads = {}
running = 0
for current in range(self.section_threads):
summary[current] = []
current_name = "%s[%d]" % (self.name, current)
loader = PGLoader(self.name,
self.config,
summary[current],
current_name)
loader.max_threads = 1
loader.reader.set_boundaries(boundaries[current])
loader.dont_prepare_nor_finish = True
threads[current_name] = loader
threads[current_name].start()
running += 1
# wait for loaders completion
while running > 0:
for cn in threads:
if not threads[cn].isAlive():
running -= 1
if running > 0:
log.info('waiting for %d threads, sleeping %gs' % (running, 1))
time.sleep(1)
self.finish_processing()
log.info('No more threads are running, %s done' % self.name)
return
else:
# here we need a special thread reading the file
pass
def prepare_processing(self):
""" Things to do before processing data """
if 'dont_prepare_nor_finish' in self.__dict__:
return
if not DRY_RUN: if not DRY_RUN:
if TRUNCATE: if TRUNCATE:
self.db.truncate(self.table) self.db.truncate(self.table)
@ -660,13 +804,10 @@ class PGLoader(threading.Thread):
if TRIGGERS: if TRIGGERS:
self.db.disable_triggers(self.table) self.db.disable_triggers(self.table)
if self.columns is not None: def finish_processing(self):
self.log.info("COPY csv data") """ Things to do after processing data """
self.data_import() if 'dont_prepare_nor_finish' in self.__dict__:
return
elif self.blob_cols is not None:
# elif: COPY process also blob data
self.log.info("UPDATE blob data")
if TRIGGERS and not DRY_RUN: if TRIGGERS and not DRY_RUN:
self.db.enable_triggers(self.table) self.db.enable_triggers(self.table)
@ -686,6 +827,18 @@ class PGLoader(threading.Thread):
self.log.info("loading done") self.log.info("loading done")
return return
def process(self):
""" depending on configuration, do given job """
if self.columns is not None:
self.log.info("COPY csv data")
self.data_import()
elif self.blob_cols is not None:
# elif: COPY process also blob data
self.log.info("UPDATE blob data")
def data_import(self): def data_import(self):
""" import CSV or TEXT data, using COPY """ """ import CSV or TEXT data, using COPY """

View File

@ -37,6 +37,9 @@ class DataReader:
if INPUT_ENCODING is not None: if INPUT_ENCODING is not None:
self.input_encoding = INPUT_ENCODING self.input_encoding = INPUT_ENCODING
self.start = None
self.end = None
def readconfig(self, name, config): def readconfig(self, name, config):
""" read configuration section for common options """ read configuration section for common options
@ -84,3 +87,8 @@ class DataReader:
""" read data from configured file, and generate (yields) for """ read data from configured file, and generate (yields) for
each data line: line, columns and rowid """ each data line: line, columns and rowid """
pass pass
def set_boundaries(self, (start, end)):
""" set the boundaries of this reader """
self.start = start
self.end = end

View File

@ -96,10 +96,19 @@ class TextReader(DataReader):
except IOError, error: except IOError, error:
raise PGLoader_Error, error raise PGLoader_Error, error
if self.start is not None and self.start > 0:
self.log.info("Text Reader starting at offset %d" % self.start)
fd.seek(self.start)
for line in fd: for line in fd:
# we count real physical lines # we count real physical lines
nb_plines += 1 nb_plines += 1
if self.end is not None and fd.tell() >= self.end:
self.log.info("Text Reader stoping, offset %d >= %d" % (fd.tell(), self.end()))
fd.close()
break
if self.input_encoding is not None: if self.input_encoding is not None:
# this may not be necessary, after all # this may not be necessary, after all
try: try: