mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-08 15:27:00 +02:00
Synchronize threads starts and ends with an Event
This commit is contained in:
parent
45c670f576
commit
a27a801f62
37
pgloader.py
37
pgloader.py
@ -462,8 +462,10 @@ def load_data():
|
|||||||
# we run through sorted section list
|
# we run through sorted section list
|
||||||
sections.sort()
|
sections.sort()
|
||||||
|
|
||||||
threads = {}
|
threads = {}
|
||||||
current = 0
|
started = {}
|
||||||
|
finished = {}
|
||||||
|
current = 0
|
||||||
interrupted = False
|
interrupted = False
|
||||||
|
|
||||||
max_running = MAX_PARALLEL_SECTIONS
|
max_running = MAX_PARALLEL_SECTIONS
|
||||||
@ -476,8 +478,12 @@ def load_data():
|
|||||||
s = sections[current]
|
s = sections[current]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
summary[s] = []
|
summary [s] = []
|
||||||
loader = PGLoader(s, config, sem, summary[s])
|
started [s] = threading.Event()
|
||||||
|
finished[s] = threading.Event()
|
||||||
|
|
||||||
|
loader = PGLoader(s, config, sem,
|
||||||
|
(started[s], finished[s]), summary[s])
|
||||||
if not loader.template:
|
if not loader.template:
|
||||||
filename = loader.filename
|
filename = loader.filename
|
||||||
input_encoding = loader.input_encoding
|
input_encoding = loader.input_encoding
|
||||||
@ -485,11 +491,13 @@ def load_data():
|
|||||||
|
|
||||||
# .start() will sem.aquire(), so we won't have more
|
# .start() will sem.aquire(), so we won't have more
|
||||||
# than max_running threads running at any time.
|
# than max_running threads running at any time.
|
||||||
log.info("Starting thread for %s" % s)
|
log.debug("Starting thread for %s" % s)
|
||||||
threads[s].start()
|
threads[s].start()
|
||||||
else:
|
else:
|
||||||
log.info("Skipping section %s, which is a template" % s)
|
log.info("Skipping section %s, which is a template" % s)
|
||||||
summary.pop(s)
|
|
||||||
|
for d in (summary, started, finished):
|
||||||
|
d.pop(s)
|
||||||
|
|
||||||
except PGLoader_Error, e:
|
except PGLoader_Error, e:
|
||||||
if e == '':
|
if e == '':
|
||||||
@ -512,19 +520,12 @@ def load_data():
|
|||||||
|
|
||||||
current += 1
|
current += 1
|
||||||
|
|
||||||
if not interrupted:
|
# get sure each thread is started, then each one is done
|
||||||
from pgloader.tools import running_threads
|
from pgloader.tools import check_events
|
||||||
|
|
||||||
n = running_threads(threads)
|
|
||||||
log.info("Waiting for %d threads to terminate" % n)
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
# Try to acquire all semaphore entries
|
check_events(started, log, "is running")
|
||||||
for i in range(max_running):
|
log.info("All threads are started, wait for them to terminate")
|
||||||
sem.acquire()
|
check_events(finished, log, "processing is over")
|
||||||
log.debug("Acquired %d times, " % (i+1) + \
|
|
||||||
"still waiting for %d threads to terminate" \
|
|
||||||
% running_threads(threads))
|
|
||||||
|
|
||||||
# total duration
|
# total duration
|
||||||
td = time.time() - begin
|
td = time.time() - begin
|
||||||
|
@ -9,7 +9,7 @@ import os, sys, os.path, time, codecs, threading
|
|||||||
from cStringIO import StringIO
|
from cStringIO import StringIO
|
||||||
|
|
||||||
from logger import log, getLogger
|
from logger import log, getLogger
|
||||||
from tools import PGLoader_Error, Reject, parse_config_string
|
from tools import PGLoader_Error, Reject, parse_config_string, check_events
|
||||||
from db import db
|
from db import db
|
||||||
from lo import ifx_clob, ifx_blob
|
from lo import ifx_clob, ifx_blob
|
||||||
|
|
||||||
@ -31,7 +31,8 @@ class PGLoader(threading.Thread):
|
|||||||
import data with COPY or update blob data with UPDATE.
|
import data with COPY or update blob data with UPDATE.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name, config, sem, stats,
|
def __init__(self, name, config, sem, (started, finished),
|
||||||
|
stats,
|
||||||
logname = None,
|
logname = None,
|
||||||
reject = None, queue = None, lock = None, copy_sep = None):
|
reject = None, queue = None, lock = None, copy_sep = None):
|
||||||
""" Init with a configuration section """
|
""" Init with a configuration section """
|
||||||
@ -47,13 +48,17 @@ class PGLoader(threading.Thread):
|
|||||||
|
|
||||||
# sem, stats and queue (if not None) are global objects
|
# sem, stats and queue (if not None) are global objects
|
||||||
# sem is shared by all threads at the same level
|
# sem is shared by all threads at the same level
|
||||||
|
# started is a threads.Event object to set() once in run()
|
||||||
|
# finished is a threads.Event object to set() once processing is over
|
||||||
# stats is a private entry of a shared dict
|
# stats is a private entry of a shared dict
|
||||||
# queue is given when reading in round robin mode
|
# queue is given when reading in round robin mode
|
||||||
# lock is a threading.Lock for reading sync
|
# lock is a threading.Lock for reading sync
|
||||||
# reject is a common reject object, protected by a parent-thread rlock
|
# reject is the common reject object
|
||||||
#
|
#
|
||||||
self.sem = sem
|
self.sem = sem
|
||||||
|
self.started = started
|
||||||
self.stats = stats
|
self.stats = stats
|
||||||
|
self.finished = finished
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.lock = lock
|
self.lock = lock
|
||||||
self.reject = reject
|
self.reject = reject
|
||||||
@ -86,10 +91,10 @@ class PGLoader(threading.Thread):
|
|||||||
|
|
||||||
if config.has_option(name, 'template'):
|
if config.has_option(name, 'template'):
|
||||||
self.template = True
|
self.template = True
|
||||||
self.log.info("[%s] is a template", self.name)
|
self.log.debug("[%s] is a template", self.name)
|
||||||
|
|
||||||
if not self.template:
|
if not self.template:
|
||||||
self.log.info("[%s] parse configuration", self.name)
|
self.log.debug("[%s] parse configuration", self.name)
|
||||||
|
|
||||||
if not self.template:
|
if not self.template:
|
||||||
# check if the section wants to use a template
|
# check if the section wants to use a template
|
||||||
@ -108,8 +113,8 @@ class PGLoader(threading.Thread):
|
|||||||
raise PGLoader_Error, m
|
raise PGLoader_Error, m
|
||||||
|
|
||||||
# first load template configuration
|
# first load template configuration
|
||||||
self.log.info("Reading configuration from template " +\
|
self.log.debug("Reading configuration from template " +\
|
||||||
"section [%s]", self.template)
|
"section [%s]", self.template)
|
||||||
|
|
||||||
self.real_log = self.log
|
self.real_log = self.log
|
||||||
self.log = getLogger(self.template)
|
self.log = getLogger(self.template)
|
||||||
@ -129,7 +134,7 @@ class PGLoader(threading.Thread):
|
|||||||
|
|
||||||
if not self.template:
|
if not self.template:
|
||||||
# now load specific configuration
|
# now load specific configuration
|
||||||
self.log.info("Reading configuration from section [%s]", name)
|
self.log.debug("Reading configuration from section [%s]", name)
|
||||||
self._read_conf(name, config, db)
|
self._read_conf(name, config, db)
|
||||||
|
|
||||||
# Now reset database connection
|
# Now reset database connection
|
||||||
@ -237,8 +242,8 @@ class PGLoader(threading.Thread):
|
|||||||
self.reject = Reject(self.log,
|
self.reject = Reject(self.log,
|
||||||
self.reject_log, self.reject_data)
|
self.reject_log, self.reject_data)
|
||||||
|
|
||||||
self.log.info('reject log in %s', self.reject.reject_log)
|
self.log.debug('reject log in %s', self.reject.reject_log)
|
||||||
self.log.info('rejected data in %s', self.reject.reject_data)
|
self.log.debug('rejected data in %s', self.reject.reject_data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# needed to instanciate self.reject while in template section
|
# needed to instanciate self.reject while in template section
|
||||||
@ -551,8 +556,8 @@ class PGLoader(threading.Thread):
|
|||||||
if 'format' not in self.__dict__:
|
if 'format' not in self.__dict__:
|
||||||
raise PGLoader_Error, "Please configure %s.format" % name
|
raise PGLoader_Error, "Please configure %s.format" % name
|
||||||
|
|
||||||
self.log.info("File '%s' will be read in %s format" \
|
self.log.debug("File '%s' will be read in %s format" \
|
||||||
% (self.filename, self.format))
|
% (self.filename, self.format))
|
||||||
|
|
||||||
if self.format.lower() == 'csv':
|
if self.format.lower() == 'csv':
|
||||||
from csvreader import CSVReader
|
from csvreader import CSVReader
|
||||||
@ -772,9 +777,12 @@ class PGLoader(threading.Thread):
|
|||||||
|
|
||||||
# care about number of threads launched
|
# care about number of threads launched
|
||||||
self.sem.acquire()
|
self.sem.acquire()
|
||||||
|
|
||||||
|
# tell parent thread we are running now
|
||||||
|
self.started.set()
|
||||||
|
|
||||||
# Announce the beginning of the work
|
# Announce the beginning of the work
|
||||||
self.log.info("%s launched" % self.logname)
|
self.log.debug("%s processing" % self.logname)
|
||||||
|
|
||||||
if self.section_threads == 1:
|
if self.section_threads == 1:
|
||||||
if 'reader' in self.__dict__ and self.reader.start is not None:
|
if 'reader' in self.__dict__ and self.reader.start is not None:
|
||||||
@ -789,9 +797,7 @@ class PGLoader(threading.Thread):
|
|||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.log.error(e)
|
self.log.error(e)
|
||||||
|
|
||||||
self.log.info("Releasing %s" % self.logname)
|
self.terminate()
|
||||||
self.sem.release()
|
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Mutli-Threaded processing of current section
|
# Mutli-Threaded processing of current section
|
||||||
@ -809,26 +815,19 @@ class PGLoader(threading.Thread):
|
|||||||
# here we need a special thread reading the file
|
# here we need a special thread reading the file
|
||||||
self.round_robin_read()
|
self.round_robin_read()
|
||||||
|
|
||||||
self.log.info("releasing %s" % self.logname)
|
self.terminate()
|
||||||
self.sem.release()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def wait_for_workers(self, sem, workers):
|
def terminate(self):
|
||||||
"""
|
""" Announce it's over and free the concurrency control semaphore """
|
||||||
Try to acquire all semaphore entries --- success means no
|
|
||||||
more thread is running
|
|
||||||
"""
|
|
||||||
from tools import running_threads
|
|
||||||
|
|
||||||
n = running_threads(workers)
|
self.log.debug("releasing %s semaphore" % self.logname)
|
||||||
self.log.info("Waiting for %d/%d threads to terminate" \
|
self.sem.release()
|
||||||
% (n, len(workers)))
|
|
||||||
|
|
||||||
for i in range(n):
|
# tell parent thread processing is now over, here
|
||||||
sem.acquire()
|
self.log.debug("Announce it's over")
|
||||||
self.log.debug("Acquired %d times, " % (i+1) + \
|
self.finished.set()
|
||||||
"still waiting for %d threads to terminate" \
|
|
||||||
% running_threads(workers))
|
|
||||||
return
|
return
|
||||||
|
|
||||||
def split_file_read(self):
|
def split_file_read(self):
|
||||||
@ -871,16 +870,21 @@ class PGLoader(threading.Thread):
|
|||||||
self.prepare_processing()
|
self.prepare_processing()
|
||||||
|
|
||||||
# now create self.section_threads PGLoader threads
|
# now create self.section_threads PGLoader threads
|
||||||
sem = threading.BoundedSemaphore(self.section_threads)
|
sem = threading.BoundedSemaphore(self.section_threads)
|
||||||
summary = {}
|
summary = {}
|
||||||
threads = {}
|
threads = {}
|
||||||
|
started = {}
|
||||||
|
finished = {}
|
||||||
|
|
||||||
for current in range(self.section_threads):
|
for current in range(self.section_threads):
|
||||||
try:
|
try:
|
||||||
summary[current] = []
|
summary[current] = []
|
||||||
current_name = "%s[%d]" % (self.name, current)
|
started[current] = threading.Event()
|
||||||
|
finished[current] = threading.Event()
|
||||||
|
current_name = "%s.%d" % (self.name, current)
|
||||||
|
|
||||||
loader = PGLoader(self.name, self.config, sem,
|
loader = PGLoader(self.name, self.config, sem,
|
||||||
|
(started[current], finished[current]),
|
||||||
summary[current],
|
summary[current],
|
||||||
logname = current_name,
|
logname = current_name,
|
||||||
reject = self.reject)
|
reject = self.reject)
|
||||||
@ -895,14 +899,13 @@ class PGLoader(threading.Thread):
|
|||||||
except Exception, e:
|
except Exception, e:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# wait for loaders completion, first let them some time to
|
# wait for workers to have started, then wait for them to terminate
|
||||||
# be started
|
check_events(started, self.log, "is running")
|
||||||
time.sleep(2)
|
check_events(finished, self.log, "processing is over")
|
||||||
|
|
||||||
self.wait_for_workers(sem, threads)
|
|
||||||
self.finish_processing()
|
self.finish_processing()
|
||||||
self.duration = time.time() - self.init_time
|
self.duration = time.time() - self.init_time
|
||||||
self.log.info('No more threads are running, %s done' % self.name)
|
self.log.debug('No more threads are running, %s done' % self.name)
|
||||||
|
|
||||||
stats = [0, 0]
|
stats = [0, 0]
|
||||||
for s in summary:
|
for s in summary:
|
||||||
@ -921,11 +924,13 @@ class PGLoader(threading.Thread):
|
|||||||
self.prepare_processing()
|
self.prepare_processing()
|
||||||
|
|
||||||
from tools import RRReader
|
from tools import RRReader
|
||||||
queues = {}
|
queues = {}
|
||||||
locks = {}
|
locks = {}
|
||||||
sem = threading.BoundedSemaphore(self.section_threads)
|
sem = threading.BoundedSemaphore(self.section_threads)
|
||||||
summary = {}
|
summary = {}
|
||||||
threads = {}
|
threads = {}
|
||||||
|
started = {}
|
||||||
|
finished = {}
|
||||||
|
|
||||||
for current in range(self.section_threads):
|
for current in range(self.section_threads):
|
||||||
queues[current] = RRReader()
|
queues[current] = RRReader()
|
||||||
@ -937,10 +942,13 @@ class PGLoader(threading.Thread):
|
|||||||
locks[current].acquire()
|
locks[current].acquire()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
summary[current] = []
|
summary [current] = []
|
||||||
current_name = "%s[%d]" % (self.name, current)
|
started [current] = threading.Event()
|
||||||
|
finished[current] = threading.Event()
|
||||||
|
current_name = "%s.%d" % (self.name, current)
|
||||||
|
|
||||||
loader = PGLoader(self.name, self.config, sem,
|
loader = PGLoader(self.name, self.config, sem,
|
||||||
|
(started[current], finished[current]),
|
||||||
summary[current],
|
summary[current],
|
||||||
logname = current_name,
|
logname = current_name,
|
||||||
reject = self.reject,
|
reject = self.reject,
|
||||||
@ -961,13 +969,13 @@ class PGLoader(threading.Thread):
|
|||||||
|
|
||||||
if len(threads) != self.section_threads:
|
if len(threads) != self.section_threads:
|
||||||
self.log.error("Couldn't start all threads, check previous errors")
|
self.log.error("Couldn't start all threads, check previous errors")
|
||||||
self.wait_for_workers(sem, threads)
|
|
||||||
|
check_events([x for x in finished if threads[x].isAlive()],
|
||||||
|
self.log, "processing is over")
|
||||||
return
|
return
|
||||||
|
|
||||||
# wait for loaders completion, first let them some time to
|
check_events(started, self.log, "is running")
|
||||||
# be started
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
# Now self.section_threads are started and we have a queue and
|
# Now self.section_threads are started and we have a queue and
|
||||||
# a Condition for each of them.
|
# a Condition for each of them.
|
||||||
#
|
#
|
||||||
@ -1002,17 +1010,19 @@ class PGLoader(threading.Thread):
|
|||||||
c = (n / self.rrqueue_size) % self.section_threads
|
c = (n / self.rrqueue_size) % self.section_threads
|
||||||
|
|
||||||
# we could have some locks to release here
|
# we could have some locks to release here
|
||||||
self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%%d=%d (n+1/rrqueue_size)%%%d=%d" \
|
self.log.debug("p=%d c=%d n=%d (n/rrqueue_size)%%%d=%d " \
|
||||||
% (p, c, n,
|
% (p, c, n,
|
||||||
self.section_threads,
|
self.section_threads,
|
||||||
(n/self.rrqueue_size) % self.section_threads,
|
(n/self.rrqueue_size) % self.section_threads) + \
|
||||||
self.section_threads,
|
"(n+1/rrqueue_size)%%%d=%d" \
|
||||||
|
% (self.section_threads,
|
||||||
((n+1)/self.rrqueue_size) % self.section_threads))
|
((n+1)/self.rrqueue_size) % self.section_threads))
|
||||||
|
|
||||||
if p != c or (n % self.rrqueue_size != 0):
|
if p != c or (n % self.rrqueue_size != 0):
|
||||||
self.log.debug("locks[%d].release" % p)
|
self.log.debug("locks[%d].release" % p)
|
||||||
locks[p].release()
|
locks[p].release()
|
||||||
|
|
||||||
|
# mark all worker threads has done
|
||||||
k = threads.keys()
|
k = threads.keys()
|
||||||
for c in range(self.section_threads):
|
for c in range(self.section_threads):
|
||||||
self.log.debug("locks[%d].acquire to set %s.done = True" \
|
self.log.debug("locks[%d].acquire to set %s.done = True" \
|
||||||
@ -1024,10 +1034,12 @@ class PGLoader(threading.Thread):
|
|||||||
self.log.debug("locks[%d].release (done set)" % c)
|
self.log.debug("locks[%d].release (done set)" % c)
|
||||||
locks[c].release()
|
locks[c].release()
|
||||||
|
|
||||||
self.wait_for_workers(sem, threads)
|
# wait for workers to finish processing
|
||||||
|
check_events(finished, self.log, "processing is over")
|
||||||
|
|
||||||
self.finish_processing()
|
self.finish_processing()
|
||||||
self.duration = time.time() - self.init_time
|
self.duration = time.time() - self.init_time
|
||||||
self.log.info('%s done' % self.name)
|
self.log.debug('%s done' % self.name)
|
||||||
|
|
||||||
stats = [0, 0]
|
stats = [0, 0]
|
||||||
for s in summary:
|
for s in summary:
|
||||||
@ -1080,7 +1092,7 @@ class PGLoader(threading.Thread):
|
|||||||
if TRIGGERS and not DRY_RUN:
|
if TRIGGERS and not DRY_RUN:
|
||||||
self.db.enable_triggers(self.table)
|
self.db.enable_triggers(self.table)
|
||||||
|
|
||||||
self.log.info("loading done")
|
self.log.debug("loading done")
|
||||||
return
|
return
|
||||||
|
|
||||||
def update_summary(self):
|
def update_summary(self):
|
||||||
|
@ -186,17 +186,14 @@ def check_dirname(path):
|
|||||||
|
|
||||||
return True, None
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
def check_events(events, log, context = "is running"):
|
||||||
|
""" wait until all events (list) are set """
|
||||||
|
for t in events:
|
||||||
|
events[t].wait()
|
||||||
|
log.debug("thread %s %s" % (t, context))
|
||||||
|
|
||||||
|
return
|
||||||
def running_threads(threads):
|
|
||||||
""" count running threads """
|
|
||||||
running = 0
|
|
||||||
for s in threads:
|
|
||||||
if threads[s].isAlive():
|
|
||||||
running += 1
|
|
||||||
|
|
||||||
return running
|
|
||||||
|
|
||||||
|
|
||||||
class RRReader(collections.deque):
|
class RRReader(collections.deque):
|
||||||
""" Round Robin reader, which are collections.deque with a
|
""" Round Robin reader, which are collections.deque with a
|
||||||
|
Loading…
Reference in New Issue
Block a user