DEBUG attempts of split_read_file(), renaming of internal methods, tests using text instead of csv

This commit is contained in:
dim 2008-02-13 17:12:30 +00:00
parent f20ac7116e
commit 4c45cc9d1c
3 changed files with 42 additions and 33 deletions

View File

@ -12,7 +12,7 @@ lc_messages = C
;client_encoding = 'utf-8'
client_encoding = 'latin1'
copy_every = 5000
copy_every = 50
commit_every = 5
#copy_delimiter = %
@ -24,19 +24,20 @@ max_parallel_sections = 1
[parallel_template]
template = True
table = parallel
format = csv
filename = parallel/parallel.data
field_sep = ;
columns = a, b
[split]
use_template = parallel_template
section_threads = 2
split_file_reading = True
format = text
use_template = parallel_template
section_threads = 2
split_file_reading = True
; round robin reader (1 reader feed section_threads processing threads)
[rrr]
use_template = parallel_template
section_threads = 2
split_file_reading = False
rrqueue_size = -1
format = csv
use_template = parallel_template
section_threads = 2
split_file_reading = False
rrqueue_size = -1

View File

@ -60,7 +60,7 @@ class PGLoader(threading.Thread):
logname = name
self.log = getLogger(logname)
self.__dbconnect__(config)
self._dbconnect(config)
self.template = None
self.use_template = None
@ -104,8 +104,8 @@ class PGLoader(threading.Thread):
self.log = getLogger(self.template)
try:
self.__read_conf__(self.template, config, db,
want_template = True)
self._read_conf(self.template, config, db,
want_template = True)
except PGLoader_Error, e:
self.log.error(e)
m = "%s.use_template does not refer to a template section"\
@ -119,7 +119,7 @@ class PGLoader(threading.Thread):
# now load specific configuration
self.log.info("Reading configuration from section [%s]", name)
self.__read_conf__(name, config, db)
self._read_conf(name, config, db)
# force reinit of self.reader, which depends on template and
# specific options
@ -135,7 +135,7 @@ class PGLoader(threading.Thread):
self.log.debug('%s init done' % name)
def __dbconnect__(self, config):
def _dbconnect(self, config):
""" connects to database """
section = 'pgsql'
@ -177,7 +177,7 @@ class PGLoader(threading.Thread):
log.error("Could not initialize PostgreSQL connection")
raise PGLoader_Error, error
def __read_conf__(self, name, config, db, want_template = False):
def _read_conf(self, name, config, db, want_template = False):
""" init self from config section name """
# we'll need both of them from the globals
@ -503,18 +503,6 @@ class PGLoader(threading.Thread):
raise PGLoader_Error, \
"Conflict: can't use both '%s' and '--from-id'" % opt
if not self.template \
and self.format.lower() == 'text' \
and self.field_count is not None:
# this option is not compatible with text mode when
# field_count is used (meaning end of line could be found
# in the data)
raise PGLoader_Error, \
"Can't use split_file_reading with text " +\
"format when 'field_count' is used"
##
# Reader's init
if config.has_option(name, 'format'):
@ -533,6 +521,25 @@ class PGLoader(threading.Thread):
self.table, self.columns,
self.newline_escapes)
if not self.template \
and self.format.lower() == 'text' \
and ('field_count' in self.reader.__dict__ \
and self.reader.field_count) \
and ('trailing_sep' in self.reader.__dict__ \
and self.reader.trailing_sep):
# this option is not compatible with text mode when
# field_count is used (meaning end of line could be found
# in the data)
raise PGLoader_Error, \
"Can't use split_file_reading with text " +\
"format when 'field_count' is used"
if not self.template:
self.log.info("File '%s' will be read in %s format" \
% (self.filename, self.format))
if 'reader' in self.__dict__:
self.log.debug('reader.readconfig()')
self.reader.readconfig(name, config)
@ -650,7 +657,7 @@ class PGLoader(threading.Thread):
""" parse the user string str for fields definition to store
into self.attr """
def __getarg(arg, argtype):
def _getarg(arg, argtype):
""" return arg depending on its type """
if argtype == 'int':
# arg is the target column index
@ -685,13 +692,13 @@ class PGLoader(threading.Thread):
if not btype:
# normal column definition, for COPY usage
colname, arg = properties
f.append((colname, __getarg(arg, argtype)))
f.append((colname, _getarg(arg, argtype)))
else:
# blob column definition, with blob type, for
# UPDATE usage
colname, arg, btype = properties
f.append((colname, __getarg(arg, argtype), btype))
f.append((colname, _getarg(arg, argtype), btype))
# update serial
if argtype == 'int':
@ -954,12 +961,12 @@ class PGLoader(threading.Thread):
k = threads.keys()
for c in range(self.section_threads):
self.log.info("locks[%d].acquire to set %s.done = True" % (c, k[c]))
self.log.debug("locks[%d].acquire to set %s.done = True" % (c, k[c]))
locks[c].acquire()
threads[k[c]].done = True
self.log.info("locks[%d].release (done set)" % c)
self.log.debug("locks[%d].release (done set)" % c)
locks[c].release()
from tools import running_threads
@ -1050,7 +1057,6 @@ class PGLoader(threading.Thread):
""" depending on configuration, do given job """
if self.columns is not None:
self.log.info("COPY csv data")
self.data_import()
elif self.blob_cols is not None:

View File

@ -95,3 +95,5 @@ class DataReader:
self.start = start
self.end = end
self.log.info("reader start=%d, end=%d" % (self.start, self.end))