Using semaphores to control how many max threads run at any given time

This commit is contained in:
dim 2008-02-12 11:27:18 +00:00
parent fc8adf1831
commit b534962192
5 changed files with 158 additions and 125 deletions

40
examples/parallel.conf Normal file
View File

@ -0,0 +1,40 @@
[pgsql]
host = localhost
port = 5432
base = pgloader
user = dim
pass = None
log_file = /tmp/pgloader.log
log_min_messages = DEBUG
client_min_messages = WARNING
lc_messages = C
;client_encoding = 'utf-8'
client_encoding = 'latin1'
copy_every = 5000
commit_every = 5
#copy_delimiter = %
null = ""
empty_string = "\ "
max_parallel_sections = 1
[parallel_template]
template = True
table = parallel
format = csv
filename = parallel/parallel.data
field_sep = ;
columns = a, b
[parallel_greg]
use_template = parallel_template
section_threads = 4
split_file_reading = True
[parallel_simon]
use_template = parallel_template
section_threads = 2
split_file_reading = False

View File

@ -19,8 +19,7 @@ commit_every = 5
null = ""
empty_string = "\ "
max_parallel_sections = 1
max_parallel_sections = 4
[simple_tmpl]
template = True
@ -99,26 +98,6 @@ field_sep = |
columns = id, timestamp
reformat = timestamp:mysql:timestamp
[parallel_template]
template = True
table = parallel
format = csv
filename = parallel/parallel.data
field_sep = ;
columns = a, b
[parallel_greg]
use_template = parallel_template
max_threads = 4
section_threads = -1
split_file_reading = True
[parallel_simon]
use_template = parallel_template
max_threads = 2
section_threads = -1
split_file_reading = False
[csv]
table = csv
format = csv

View File

@ -5,7 +5,7 @@
PostgreSQL data import tool, see included man page.
"""
import os, sys, os.path, time, codecs, logging
import os, sys, os.path, time, codecs, logging, threading
from cStringIO import StringIO
import pgloader.options
@ -342,8 +342,13 @@ def print_summary(dbconn, sections, summary, td):
print t
print _
t, d, u, e = summary[s]
d = duration_pprint(d)
if summary[s]:
t, d, u, e = summary[s]
d = duration_pprint(d)
else:
t = s
d = '%9s ' % '-'
u = e = 0
if False and not DRY_RUN:
sql = "select pg_total_relation_size(%s), " + \
@ -388,6 +393,16 @@ def print_summary(dbconn, sections, summary, td):
return retcode
def running_threads(threads):
""" count running threads """
running = 0
for s in threads:
if threads[s].isAlive():
running += 1
return running
def load_data():
""" read option line and configuration file, then process data
import of given section, or all sections if no section is given on
@ -458,46 +473,34 @@ def load_data():
sections.sort()
threads = {}
running = 0
current = 0
interrupted = False
max_running = MAX_PARALLEL_SECTIONS
if max_running == -1:
max_running = len(sections)
sem = threading.BoundedSemaphore(max_running)
while current < len(sections):
s = sections[current]
# update running
if running > 0:
for s in threads:
if not threads[s].isAlive():
running -= 1
if MAX_PARALLEL_SECTIONS != -1:
# -1 means we can start as many parallel section
# processing as we want to
if running == MAX_PARALLEL_SECTIONS:
# we have to wait for one thread to terminate
# before considering next one
log.info('%d/%d threads running, sleeping %gs' \
% (running, MAX_PARALLEL_SECTIONS, .1))
time.sleep(.1)
continue
try:
summary[s] = []
loader = PGLoader(s, config, summary[s])
loader = PGLoader(s, config, sem, summary[s])
if not loader.template:
filename = loader.filename
input_encoding = loader.input_encoding
threads[s] = loader
log.info("Starting thread %d for %s" % (running, s))
threads[s] = loader
# .start() will sem.aquire(), so we won't have more
# than max_running threads running at any time.
log.info("Starting thread for %s" % s)
threads[s].start()
running += 1
else:
log.info("Skipping section %s, which is a template" % s)
summary.pop(s)
except PGLoader_Error, e:
if e == '':
log.error('[%s] Please correct previous errors' % s)
@ -514,39 +517,36 @@ def load_data():
% (filename, input_encoding))
except KeyboardInterrupt:
interrupted = True
log.warning("Aborting on user demand (Interrupt)")
current += 1
while running > 0:
for s in threads:
if not threads[s].isAlive():
running -= 1
if not interrupted:
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
if running > 0:
if MAX_PARALLEL_SECTIONS != 1:
log.info("%d thread(s) still running" % running)
try:
if MAX_PARALLEL_SECTIONS != 1:
log.info('waiting for %d threads, sleeping %gs' % (running, 1))
time.sleep(1)
except KeyboardInterrupt:
log.warning("Aborting %d threads still running at user demand"\
% running)
break
# Try to acquire all semaphore entries
for i in range(max_running):
sem.acquire()
log.debug("Acquired %d times, " % i + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
# total duration
td = time.time() - begin
retcode = 0
if SUMMARY:
if SUMMARY and not interrupted:
try:
retcode = print_summary(None, sections, summary, td)
print
except PGLoader_Error, e:
log.error("Can't print summary: %s" % e)
except KeyboardInterrupt:
pass
return retcode
if __name__ == "__main__":

View File

@ -36,9 +36,8 @@ UDC_PREFIX = 'udc_'
REFORMAT_PATH = None
DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat']
MAX_THREADS = 1
MAX_PARALLEL_SECTIONS = 1
SECTION_THREADS = -1
SECTION_THREADS = 1
SPLIT_FILE_READING = False
CLIENT_MIN_MESSAGES = None

View File

@ -21,7 +21,7 @@ from options import COPY_SEP, FIELD_SEP, CLOB_SEP, NULL, EMPTY_STRING
from options import NEWLINE_ESCAPES
from options import UDC_PREFIX
from options import REFORMAT_PATH
from options import MAX_THREADS, MAX_PARALLEL_SECTIONS
from options import MAX_PARALLEL_SECTIONS
from options import SECTION_THREADS, SPLIT_FILE_READING
class PGLoader(threading.Thread):
@ -30,12 +30,17 @@ class PGLoader(threading.Thread):
import data with COPY or update blob data with UPDATE.
"""
def __init__(self, name, config, stats, logname = None):
def __init__(self, name, config, sem, stats, logname = None):
""" Init with a configuration section """
threading.Thread.__init__(self, name = name)
# Some settings
# sem and stats are global objects:
# sem is shared by all threads at the same level, stats is a
# private entry of a shared dict
self.sem = sem
self.stats = stats
# thereafter parameters are local
self.name = name
self.config = config
@ -163,6 +168,9 @@ class PGLoader(threading.Thread):
def __read_conf__(self, name, config, db, want_template = False):
""" init self from config section name """
# we'll need both of them from the globals
global FROM_COUNT, FROM_ID
if want_template and not config.has_option(name, 'template'):
e = 'Error: section %s is not a template' % name
raise PGLoader_Error, e
@ -447,38 +455,29 @@ class PGLoader(threading.Thread):
##
# Parallelism knobs
for opt, default in [('max_threads', MAX_THREADS),
('section_threads', SECTION_THREADS),
('split_file_reading', SPLIT_FILE_READING)]:
if config.has_option(name, opt):
if opt in ['max_threads', 'section_threads']:
self.__dict__[opt] = config.getint(name, opt)
else:
self.__dict__[opt] = config.get(name, opt) == 'True'
else:
if not self.template:
self.__dict__[opt] = default
if config.has_option(name, 'section_threads'):
self.section_threads = config.getint(name, 'section_threads')
else:
self.section_threads = SECTION_THREADS
if not self.template:
self.log.info('%s.%s = %s' % (name, opt, str(self.__dict__[opt])))
if config.has_option(name, 'split_file_reading'):
self.split_file_reading = config.get(name, 'split_file_reading') == 'True'
else:
self.split_file_reading = SPLIT_FILE_READING
if not self.template:
for opt in ('section_threads', 'split_file_reading'):
self.log.debug('%s.%s = %s' % (name, opt, str(self.__dict__[opt])))
if not self.template and self.split_file_reading:
global FROM_COUNT
if FROM_COUNT is not None and FROM_COUNT > 0:
raise PGLoader_Error, \
"Conflict: can't use both 'split_file_reading' and '--from'"
global FROM_ID
if FROM_ID is not None:
raise PGLoader_Error, \
"Conflict: can't use both 'split_file_reading' and '--from-id'"
if not self.template and self.section_threads > self.max_threads:
raise PGLoader_Error, \
"%s.section_threads > %s.max_threads : %d > %d" \
% (name, name, self.section_threads, self.max_threads)
##
# Reader's init
if config.has_option(name, 'format'):
@ -587,7 +586,6 @@ class PGLoader(threading.Thread):
# if options.fromid is not None it has to be either a value,
# when index is single key or a dict in a string, when index
# is a multiple key
global FROM_ID
if FROM_ID is not None:
if len(self.index) > 1:
# we have to evaluate given string and see if it is a
@ -691,12 +689,14 @@ class PGLoader(threading.Thread):
def run(self):
""" controling thread which dispatch the job """
# care about number of threads launched
self.sem.acquire()
# Announce the beginning of the work
self.log.info("[%s]" % self.name)
if self.max_threads == 1:
self.log.info("%s launched" % self.name)
if self.section_threads == 1:
if self.reader.start is not None:
self.log.info("Loading from offset %d to %d" \
% (self.reader.start, self.reader.end))
@ -704,11 +704,11 @@ class PGLoader(threading.Thread):
self.prepare_processing()
self.process()
self.finish_processing()
return
# now we're going to need mutli-threading
if self.section_threads == -1:
self.section_threads = self.max_threads
self.log.info("Releasing %s" % self.name)
self.sem.release()
return
if self.split_file_reading:
# this option is not compatible with text mode when
@ -731,7 +731,7 @@ class PGLoader(threading.Thread):
previous = end + 1
self.log.info("Spliting input file of %d bytes %s" \
self.log.debug("Spliting input file of %d bytes %s" \
% (filesize, str(boundaries)))
# Now check for real boundaries
@ -755,43 +755,58 @@ class PGLoader(threading.Thread):
self.prepare_processing()
# now create self.section_threads PGLoader threads
# the semaphore here is not really usefull, but is part of the API
sem = threading.BoundedSemaphore(self.section_threads)
summary = {}
threads = {}
running = 0
for current in range(self.section_threads):
summary[current] = []
current_name = "%s[%d]" % (self.name, current)
loader = PGLoader(self.name,
self.config,
summary[current],
current_name)
loader.max_threads = 1
loader.reader.set_boundaries(boundaries[current])
loader.dont_prepare_nor_finish = True
threads[current_name] = loader
threads[current_name].start()
running += 1
try:
summary[current] = []
current_name = "%s[%d]" % (self.name, current)
loader = PGLoader(self.name, self.config, sem,
summary[current], current_name)
loader.section_threads = 1
loader.reader.set_boundaries(boundaries[current])
loader.dont_prepare_nor_finish = True
threads[current_name] = loader
threads[current_name].start()
running += 1
except Exception, e:
raise
# wait for loaders completion
while running > 0:
for cn in threads:
if not threads[cn].isAlive():
running -= 1
try:
for cn in threads:
if not threads[cn].isAlive():
running -= 1
if running > 0:
log.info('waiting for %d threads, sleeping %gs' % (running, 1))
time.sleep(1)
if running > 0:
self.log.info('waiting for %d threads, sleeping %gs' % (running, 1))
time.sleep(1)
except KeyboardInterrupt:
self.log.warning("Aborting %d threads in section %s "\
% (running, self.name))
break
self.finish_processing()
log.info('No more threads are running, %s done' % self.name)
return
self.log.info('No more threads are running, %s done' % self.name)
else:
# here we need a special thread reading the file
pass
self.sem.release()
self.log.info("%s released" % self.name)
return
def prepare_processing(self):
""" Things to do before processing data """
if 'dont_prepare_nor_finish' in self.__dict__: