First implementation of Round Robin Reader threading method, Work In Progress --- does not work at all yet

This commit is contained in:
dim 2008-02-12 21:58:42 +00:00
parent f60cf4c8c6
commit bac5d2b482
5 changed files with 264 additions and 107 deletions

View File

@ -6,6 +6,6 @@ CREATE TABLE parallel (
-- create the .data file
insert into parallel
select * from (select a, a::text
from generate_series(0, 1000 * 1000 * 1000) as t(a)) x;
from generate_series(0, 1000 * 1000) as t(a)) x;
\copy parallel to 'parallel/parallel.data' with delimiter ';' csv

View File

@ -39,6 +39,7 @@ DEFAULT_REFORMAT_PATH = ['/usr/share/python-support/pgloader/reformat']
MAX_PARALLEL_SECTIONS = 1
SECTION_THREADS = 1
SPLIT_FILE_READING = False
RRQUEUE_SIZE = None
CLIENT_MIN_MESSAGES = None
LOG_MIN_MESSAGES = DEBUG

View File

@ -23,6 +23,7 @@ from options import UDC_PREFIX
from options import REFORMAT_PATH
from options import MAX_PARALLEL_SECTIONS
from options import SECTION_THREADS, SPLIT_FILE_READING
from options import RRQUEUE_SIZE
class PGLoader(threading.Thread):
"""
@ -30,20 +31,28 @@ class PGLoader(threading.Thread):
import data with COPY or update blob data with UPDATE.
"""
def __init__(self, name, config, sem, stats, logname = None):
def __init__(self, name, config, sem, stats,
logname = None, queue = None, lock = None):
""" Init with a configuration section """
threading.Thread.__init__(self, name = name)
# sem and stats are global objects:
# sem is shared by all threads at the same level, stats is a
# private entry of a shared dict
# sem, stats and queue (if not None) are global objects:
# sem is shared by all threads at the same level
# stats is a private entry of a shared dict
# queue is given when reading in round robin mode
# lock is a threading.Lock for reading sync
#
self.sem = sem
self.stats = stats
self.queue = queue
self.lock = lock
# thereafter parameters are local
self.name = name
self.config = config
# logname is given when we use several Threads for reading
# input file
if logname is None:
logname = name
self.log = getLogger(logname)
@ -470,13 +479,26 @@ class PGLoader(threading.Thread):
self.log.debug('%s.%s = %s' % (name, opt, str(self.__dict__[opt])))
if not self.template and self.split_file_reading:
opt = 'split_file_reading'
if FROM_COUNT is not None and FROM_COUNT > 0:
raise PGLoader_Error, \
"Conflict: can't use both 'split_file_reading' and '--from'"
"Conflict: can't use both '%s' and '--from'" % opt
if FROM_ID is not None:
raise PGLoader_Error, \
"Conflict: can't use both 'split_file_reading' and '--from-id'"
"Conflict: can't use both '%s' and '--from-id'" % opt
if not self.template \
and self.format.lower() == 'text' \
and self.field_count is not None:
# this option is not compatible with text mode when
# field_count is used (meaning end of line could be found
# in the data)
raise PGLoader_Error, \
"Can't use split_file_reading with text " +\
"format when 'field_count' is used"
##
# Reader's init
@ -694,7 +716,8 @@ class PGLoader(threading.Thread):
self.sem.acquire()
# Announce the beginning of the work
self.log.info("%s launched" % self.name)
self.log.info("%s launched --- %d threads" % (self.name,
self.section_threads))
if self.section_threads == 1:
if self.reader.start is not None:
@ -711,109 +734,225 @@ class PGLoader(threading.Thread):
return
if self.split_file_reading:
# this option is not compatible with text mode when
# field_count is used (meaning end of line could be found
# in the data)
if self.format.lower() == 'text' and self.field_count is not None:
raise PGLoader_Error, \
"Can't use split_file_reading with text " +\
"format when 'field_count' is used"
# init boundaries to give to each thread
from stat import ST_SIZE
previous = 0
filesize = os.stat(self.filename)[ST_SIZE]
boundaries = []
for partn in range(self.section_threads):
start = previous
end = (partn+1)*filesize / self.section_threads
boundaries.append((start, end))
previous = end + 1
self.log.debug("Spliting input file of %d bytes %s" \
% (filesize, str(boundaries)))
# Now check for real boundaries
fd = file(self.filename)
b = 0
for b in range(len(boundaries)):
start, end = boundaries[b]
fd.seek(end)
dummy_str = fd.readline()
# update both current boundary end and next start
boundaries[b] = (start, fd.tell()-1)
if (b+1) < len(boundaries):
boundaries[b+1] = (fd.tell(), boundaries[b+1][1])
fd.close()
self.log.info("Spliting input file of %d bytes %s" \
% (filesize, str(boundaries)))
self.prepare_processing()
# now create self.section_threads PGLoader threads
sem = threading.BoundedSemaphore(self.section_threads)
summary = {}
threads = {}
running = 0
for current in range(self.section_threads):
try:
summary[current] = []
current_name = "%s[%d]" % (self.name, current)
loader = PGLoader(self.name, self.config, sem,
summary[current], current_name)
loader.section_threads = 1
loader.reader.set_boundaries(boundaries[current])
loader.dont_prepare_nor_finish = True
threads[current_name] = loader
threads[current_name].start()
running += 1
except Exception, e:
raise
# wait for loaders completion, first let them some time to
# be started
time.sleep(2)
from tools import running_threads
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
# Try to acquire all semaphore entries
for i in range(self.section_threads):
sem.acquire()
log.info("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.finish_processing()
self.duration = time.time() - self.init_time
self.log.info('No more threads are running, %s done' % self.name)
stats = [0, 0]
for s in summary:
for i in range(2, len(summary[s])):
stats[i-2] += summary[s][i]
for x in [self.table, self.duration] + stats:
self.stats.append(x)
# start self.section_threads workers
self.split_file_read()
else:
# here we need a special thread reading the file
pass
self.round_robin_read()
self.sem.release()
self.log.info("%s released" % self.name)
return
def split_file_read(self):
""" Current thread will start self.section_threads threads,
each one reading a part of the input file. """
# init boundaries to give to each thread
from stat import ST_SIZE
previous = 0
filesize = os.stat(self.filename)[ST_SIZE]
boundaries = []
for partn in range(self.section_threads):
start = previous
end = (partn+1)*filesize / self.section_threads
boundaries.append((start, end))
previous = end + 1
self.log.debug("Spliting input file of %d bytes %s" \
% (filesize, str(boundaries)))
# Now check for real boundaries
fd = file(self.filename)
b = 0
for b in range(len(boundaries)):
start, end = boundaries[b]
fd.seek(end)
dummy_str = fd.readline()
# update both current boundary end and next start
boundaries[b] = (start, fd.tell()-1)
if (b+1) < len(boundaries):
boundaries[b+1] = (fd.tell(), boundaries[b+1][1])
fd.close()
self.log.info("Spliting input file of %d bytes %s" \
% (filesize, str(boundaries)))
self.prepare_processing()
# now create self.section_threads PGLoader threads
sem = threading.BoundedSemaphore(self.section_threads)
summary = {}
threads = {}
for current in range(self.section_threads):
try:
summary[current] = []
current_name = "%s[%d]" % (self.name, current)
loader = PGLoader(self.name, self.config, sem,
summary[current], current_name)
loader.section_threads = 1
loader.reader.set_boundaries(boundaries[current])
loader.dont_prepare_nor_finish = True
threads[current_name] = loader
threads[current_name].start()
except Exception, e:
raise
# wait for loaders completion, first let them some time to
# be started
time.sleep(2)
from tools import running_threads
n = running_threads(threads)
log.info("Waiting for %d threads to terminate" % n)
# Try to acquire all semaphore entries
for i in range(self.section_threads):
sem.acquire()
log.info("Acquired %d times, " % (i+1) + \
"still waiting for %d threads to terminate" \
% running_threads(threads))
self.finish_processing()
self.duration = time.time() - self.init_time
self.log.info('No more threads are running, %s done' % self.name)
stats = [0, 0]
for s in summary:
for i in range(2, len(summary[s])):
stats[i-2] += summary[s][i]
for x in [self.table, self.duration] + stats:
self.stats.append(x)
return
def round_robin_read(self):
""" Start self.section_threads threads to process data, this
thread will read the input file and distribute the processing
on a round-robin fashion"""
self.prepare_processing()
from tools import RRReader
queues = {}
locks = {}
sem = threading.BoundedSemaphore(self.section_threads)
summary = {}
threads = {}
rrqueue_size = RRQUEUE_SIZE
if rrqueue_size is None:
rrqueue_size = self.db.copy_every
for current in range(self.section_threads):
queues[current] = RRReader()
locks [current] = threading.Lock()
# acquire the lock before starting worker thread
# and release it once its queue if full
self.log.info("locks[%d].acquire" % current)
locks[current].acquire()
try:
summary[current] = []
current_name = "%s[%d]" % (self.name, current)
loader = PGLoader(self.name, self.config, sem,
summary[current],
logname = current_name,
queue = queues[current],
lock = locks [current])
loader.section_threads = 1
loader.dont_prepare_nor_finish = True
loader.done = False
threads[current_name] = loader
threads[current_name].start()
except Exception, e:
raise
# wait for loaders completion, first let them some time to
# be started
time.sleep(2)
# Now self.section_threads are started and we have a queue and
# a Condition for each of them.
#
# read the input file here, and give each worker Thread is
# share to process, in a round-robin fashion
n = 0 # line number
c = 0 # current
p = c # previous
for line, columns in self.reader.readlines():
if p != c:
self.log.info("read %d lines, queue to thread %s" % (n, c))
# release p'thread (which will empty its queue) and
# lock c'thread --- waiting until it has emptied its
# queue
self.log.info("locks[%d].release" % p)
self.log.info("locks[%d].acquire" % c)
locks[p].release()
locks[c].acquire()
queues[c].append((line, columns))
n += 1
p = c
c = (n / rrqueue_size) % self.section_threads
# don't forget to process last run
locks[c].release()
k = threads.keys()
for c in range(self.section_threads):
threads[k[c]].acquire()
threads[k[c]].done = True
threads[k[c]].release()
self.finish_processing()
self.duration = time.time() - self.init_time
self.log.info('%s done' % self.name)
stats = [0, 0]
for s in summary:
for i in range(2, len(summary[s])):
stats[i-2] += summary[s][i]
for x in [self.table, self.duration] + stats:
self.stats.append(x)
return
def readlines(self):
""" return next line from either self.queue or self.reader """
if self.queue is None:
for line, columns in self.reader.readlines():
yield line, columns
return
while not self.done:
self.lock.acquire()
for line, columns in self.queue.readlines():
yield line, columns
self.lock.release()
return
def prepare_processing(self):
""" Things to do before processing data """
if 'dont_prepare_nor_finish' in self.__dict__:
@ -844,7 +983,8 @@ class PGLoader(threading.Thread):
if self.reject is not None:
self.errors = self.reject.errors
for x in [self.table, self.duration, self.db.commited_rows, self.errors]:
for x in [self.table, self.duration,
self.db.commited_rows, self.errors]:
self.stats.append(x)
# then show up some stats
@ -873,8 +1013,8 @@ class PGLoader(threading.Thread):
if self.udcs:
dudcs = dict(self.udcs)
for line, columns in self.reader.readlines():
for line, columns in self.readlines():
if self.blob_cols is not None:
columns, rowids = self.read_blob(line, columns)

View File

@ -37,6 +37,8 @@ class DataReader:
if INPUT_ENCODING is not None:
self.input_encoding = INPUT_ENCODING
# (start, end) are used for split_file_reading mode
# queue when in round_robin_read mode
self.start = None
self.end = None
@ -92,3 +94,4 @@ class DataReader:
""" set the boundaries of this reader """
self.start = start
self.end = end

View File

@ -2,7 +2,7 @@
#
# pgloader librairies
import os, sys, os.path, time, codecs
import os, sys, os.path, time, codecs, collections
from cStringIO import StringIO
from options import DRY_RUN, PEDANTIC
@ -176,3 +176,16 @@ def running_threads(threads):
running += 1
return running
class RRReader(collections.deque):
""" Round Robin reader, which are collections.deque with a
readlines() method"""
def readlines(self):
""" return next line from queue """
while 1:
try:
yield self.popleft()
except IndexError:
return