mirror of
https://github.com/dimitri/pgloader.git
synced 2026-02-05 06:21:40 +01:00
FIX reader object init in the presence of templates
This commit is contained in:
parent
4c45cc9d1c
commit
4bfaea7d83
@ -25,27 +25,29 @@ class CSVReader(DataReader):
|
||||
Read some CSV formatted data
|
||||
"""
|
||||
|
||||
def readconfig(self, name, config):
|
||||
def readconfig(self, config, name, template):
|
||||
""" get this reader module configuration from config file """
|
||||
DataReader.readconfig(self, name, config)
|
||||
DataReader.readconfig(self, config, name, template)
|
||||
|
||||
self._getopt('doublequote', config, name, template, False)
|
||||
if self.doublequote is not False:
|
||||
self.doublequote = self.doublequote == 'True'
|
||||
|
||||
# optionnal doublequote: defaults to escaping, not doubling
|
||||
self.doublequote = False
|
||||
if config.has_option(name, 'doublequote'):
|
||||
self.trailing_sep = config.get(name, 'doublequote') == 'True'
|
||||
self._getopt('escapechar', config, name, template, None)
|
||||
if self.escapechar is not None:
|
||||
self.escapechar = self.escapechar[0]
|
||||
|
||||
self.escapechar = None
|
||||
if config.has_option(name, 'escapechar'):
|
||||
self.escapechar = config.get(name, 'escapechar')[0]
|
||||
self._getopt('quotechar', config, name, template, '"')
|
||||
self.quotechar = self.quotechar[0]
|
||||
|
||||
self.quotechar = '"'
|
||||
if config.has_option(name, 'quotechar'):
|
||||
self.quotechar = config.get(name, 'quotechar')[0]
|
||||
|
||||
self.skipinitialspace = False
|
||||
if config.has_option(name, 'skipinitialspace'):
|
||||
self.skipinitialspace = config.get(name, 'skipinitialspace') == 'True'
|
||||
self._getopt('skipinitialspace', config, name, template, False)
|
||||
if self.skipinitialspace is not False:
|
||||
self.skipinitialspace = self.skipinitialspace == 'True'
|
||||
|
||||
for opt in ['doublequote', 'escapechar',
|
||||
'quotechar', 'skipinitialspace']:
|
||||
|
||||
self.log.debug("reader.readconfig %s: '%s'" % (opt, self.__dict__[opt]))
|
||||
|
||||
def readlines(self):
|
||||
""" read data from configured file, and generate (yields) for
|
||||
|
||||
@ -64,6 +64,7 @@ class PGLoader(threading.Thread):
|
||||
|
||||
self.template = None
|
||||
self.use_template = None
|
||||
self.tsection = None
|
||||
|
||||
self.index = None
|
||||
self.columns = None
|
||||
@ -88,6 +89,7 @@ class PGLoader(threading.Thread):
|
||||
if not self.template:
|
||||
# check if the section wants to use a template
|
||||
if config.has_option(name, 'use_template'):
|
||||
self.tsection = config.get(name, 'use_template')
|
||||
self.template = config.get(name, 'use_template')
|
||||
|
||||
if not config.has_section(self.template):
|
||||
@ -121,13 +123,6 @@ class PGLoader(threading.Thread):
|
||||
|
||||
self._read_conf(name, config, db)
|
||||
|
||||
# force reinit of self.reader, which depends on template and
|
||||
# specific options
|
||||
if 'reader' in self.__dict__:
|
||||
self.reader.__init__(self.log, self.db, self.reject,
|
||||
self.filename, self.input_encoding,
|
||||
self.table, self.columns)
|
||||
|
||||
# Now reset database connection
|
||||
if not DRY_RUN:
|
||||
self.db.log = self.log
|
||||
@ -141,6 +136,7 @@ class PGLoader(threading.Thread):
|
||||
|
||||
if DRY_RUN:
|
||||
log.info("dry run mode, not connecting to database")
|
||||
self.db = None
|
||||
return
|
||||
|
||||
try:
|
||||
@ -214,7 +210,7 @@ class PGLoader(threading.Thread):
|
||||
self.log.info('rejected data in %s', self.reject.reject_data)
|
||||
|
||||
else:
|
||||
# needed to instanciate self.reader while in template section
|
||||
# needed to instanciate self.reject while in template section
|
||||
self.reject = None
|
||||
|
||||
# optionnal local option client_encoding
|
||||
@ -487,7 +483,11 @@ class PGLoader(threading.Thread):
|
||||
self.rrqueue_size = config.getint(name, 'rrqueue_size')
|
||||
|
||||
if self.rrqueue_size is None or self.rrqueue_size < 1:
|
||||
self.rrqueue_size = self.db.copy_every
|
||||
if DRY_RUN:
|
||||
# won't be used
|
||||
self.rrqueue_size = 1
|
||||
else:
|
||||
self.rrqueue_size = self.db.copy_every
|
||||
|
||||
if not self.template:
|
||||
for opt in ('section_threads', 'split_file_reading'):
|
||||
@ -507,48 +507,50 @@ class PGLoader(threading.Thread):
|
||||
# Reader's init
|
||||
if config.has_option(name, 'format'):
|
||||
self.format = config.get(name, 'format')
|
||||
|
||||
if not self.template:
|
||||
# Only init self.reader in real section, not from
|
||||
# template. self.reader.readconfig() will care about
|
||||
# reading its configuration from template and current
|
||||
# section.
|
||||
|
||||
if 'format' not in self.__dict__:
|
||||
raise PGLoader_Error, "Please configure %s.format" % name
|
||||
|
||||
self.log.info("File '%s' will be read in %s format" \
|
||||
% (self.filename, self.format))
|
||||
|
||||
if self.format.lower() == 'csv':
|
||||
from csvreader import CSVReader
|
||||
self.reader = CSVReader(self.log, self.db, self.reject,
|
||||
self.filename, self.input_encoding,
|
||||
self.filename,
|
||||
self.input_encoding,
|
||||
self.table, self.columns)
|
||||
|
||||
elif self.format.lower() == 'text':
|
||||
from textreader import TextReader
|
||||
self.reader = TextReader(self.log, self.db, self.reject,
|
||||
self.filename, self.input_encoding,
|
||||
self.filename,
|
||||
self.input_encoding,
|
||||
self.table, self.columns,
|
||||
self.newline_escapes)
|
||||
|
||||
if not self.template \
|
||||
and self.format.lower() == 'text' \
|
||||
and ('field_count' in self.reader.__dict__ \
|
||||
and self.reader.field_count) \
|
||||
and ('trailing_sep' in self.reader.__dict__ \
|
||||
and self.reader.trailing_sep):
|
||||
|
||||
# this option is not compatible with text mode when
|
||||
# field_count is used (meaning end of line could be found
|
||||
# in the data)
|
||||
|
||||
raise PGLoader_Error, \
|
||||
"Can't use split_file_reading with text " +\
|
||||
"format when 'field_count' is used"
|
||||
|
||||
if not self.template:
|
||||
self.log.info("File '%s' will be read in %s format" \
|
||||
% (self.filename, self.format))
|
||||
|
||||
if 'reader' in self.__dict__:
|
||||
self.log.debug('reader.readconfig()')
|
||||
self.reader.readconfig(name, config)
|
||||
self.reader.readconfig(config, name, self.tsection)
|
||||
|
||||
if not self.template and \
|
||||
('format' not in self.__dict__ or self.format is None):
|
||||
# error only when not loading the Template part
|
||||
self.log.Error('%s: format parameter needed', name)
|
||||
raise PGLoader_Error
|
||||
|
||||
if self.split_file_reading:
|
||||
if self.format.lower() == 'text' \
|
||||
and (self.reader.field_count is not None \
|
||||
or self.reader.trailing_sep):
|
||||
|
||||
# split_file_reading is not compatible with text
|
||||
# mode when field_count or trailing_sep is used
|
||||
# (meaning end of line could be found in the data)
|
||||
|
||||
raise PGLoader_Error, \
|
||||
"Can't use split_file_reading with text " +\
|
||||
"format when 'field_count' or 'trailing_sep' is used"
|
||||
|
||||
##
|
||||
# Some column might need reformating
|
||||
@ -1045,9 +1047,14 @@ class PGLoader(threading.Thread):
|
||||
|
||||
if self.reject is not None:
|
||||
self.errors = self.reject.errors
|
||||
|
||||
if DRY_RUN:
|
||||
self.commited_rows = 0
|
||||
else:
|
||||
self.commited_rows = self.db.commited_rows
|
||||
|
||||
for x in [self.table, self.duration,
|
||||
self.db.commited_rows, self.errors]:
|
||||
self.commited_rows, self.errors]:
|
||||
self.stats.append(x)
|
||||
|
||||
# then show up some stats
|
||||
|
||||
@ -42,11 +42,14 @@ class DataReader:
|
||||
self.start = None
|
||||
self.end = None
|
||||
|
||||
def readconfig(self, name, config):
|
||||
def readconfig(self, config, name, template):
|
||||
""" read configuration section for common options
|
||||
|
||||
name is configuration section name, conf the ConfigParser object
|
||||
|
||||
template is the (maybe None) template section name declared in
|
||||
the use_template configuration option.
|
||||
|
||||
specific option reading code is to be found on subclasses
|
||||
which implements read data parsing code.
|
||||
|
||||
@ -55,35 +58,40 @@ class DataReader:
|
||||
|
||||
if not DRY_RUN:
|
||||
# optionnal null and empty_string per table parameters
|
||||
if config.has_option(name, 'null'):
|
||||
self.db.null = parse_config_string(config.get(name, 'null'))
|
||||
else:
|
||||
if 'null' not in self.__dict__:
|
||||
self.db.null = NULL
|
||||
self._getopt('null', config, name, template, NULL)
|
||||
self.db.null = parse_config_string(self.null)
|
||||
|
||||
if config.has_option(name, 'empty_string'):
|
||||
self.db.empty_string = parse_config_string(
|
||||
config.get(name, 'empty_string'))
|
||||
else:
|
||||
if 'empty_string' not in self.__dict__:
|
||||
self.db.empty_string = EMPTY_STRING
|
||||
self._getopt('empty_string', config, name, template, EMPTY_STRING)
|
||||
self.db.empty_string = parse_config_string(self.empty_string)
|
||||
|
||||
# optionnal field separator, could be defined from template
|
||||
if 'field_sep' not in self.__dict__:
|
||||
self.field_sep = FIELD_SEP
|
||||
|
||||
if config.has_option(name, 'field_sep'):
|
||||
self.field_sep = config.get(name, 'field_sep')
|
||||
|
||||
if not DRY_RUN:
|
||||
if self.db.copy_sep is None:
|
||||
self.db.copy_sep = self.field_sep
|
||||
self._getopt('field_sep', config, name, template, FIELD_SEP)
|
||||
if not DRY_RUN:
|
||||
if self.db.copy_sep is None:
|
||||
self.db.copy_sep = self.field_sep
|
||||
|
||||
if not DRY_RUN:
|
||||
self.log.debug("reader.readconfig null: '%s'" % self.db.null)
|
||||
self.log.debug("reader.readconfig empty_string: '%s'",
|
||||
self.db.empty_string)
|
||||
self.log.debug("reader.readconfig field_sep: '%s'", self.field_sep)
|
||||
|
||||
self.log.debug("reader.readconfig field_sep: '%s'", self.field_sep)
|
||||
|
||||
def _getopt(self, option, config, section, template, default = None):
|
||||
""" Init given configuration option """
|
||||
|
||||
if config.has_option(section, option):
|
||||
self.__dict__[option] = config.get(section, option)
|
||||
self.log.debug("reader._getopt %s from %s is '%s'" % (option, section, self.__dict__[option]))
|
||||
|
||||
elif template and config.has_option(template, option):
|
||||
self.__dict__[option] = config.get(template, option)
|
||||
self.log.debug("reader._getopt %s from %s is '%s'" % (option, template, self.__dict__[option]))
|
||||
|
||||
elif option not in self.__dict__:
|
||||
self.log.debug("reader._getopt %s defaults to '%s'" % (option, default))
|
||||
self.__dict__[option] = default
|
||||
|
||||
return self.__dict__[option]
|
||||
|
||||
def readlines(self):
|
||||
""" read data from configured file, and generate (yields) for
|
||||
|
||||
@ -39,27 +39,18 @@ class TextReader(DataReader):
|
||||
if 'newline_escapes' not in self.__dict__:
|
||||
self.newline_escapes = newline_escapes
|
||||
|
||||
def readconfig(self, name, config):
|
||||
def readconfig(self, config, name, template):
|
||||
""" get this reader module configuration from config file """
|
||||
DataReader.readconfig(self, name, config)
|
||||
DataReader.readconfig(self, config, name, template)
|
||||
|
||||
# this will be called twice if templates are in used, so we
|
||||
# have to protect ourselves against removing already read
|
||||
# configurations while in second run.
|
||||
|
||||
# optionnal number of columns per line
|
||||
if 'field_count' not in self.__dict__:
|
||||
self.field_count = None
|
||||
|
||||
if config.has_option(name, 'field_count'):
|
||||
self.field_count = config.getint(name, 'field_count')
|
||||
|
||||
# optionnal trailing separator option
|
||||
if 'trailing_sep' not in self.__dict__:
|
||||
self.trailing_sep = False
|
||||
|
||||
if config.has_option(name, 'trailing_sep'):
|
||||
self.trailing_sep = config.get(name, 'trailing_sep') == 'True'
|
||||
self._getopt('field_count', config, name, template, None)
|
||||
self._getopt('trailing_sep', config, name, template, False)
|
||||
if self.trailing_sep is not False:
|
||||
self.trailing_sep = self.trailing_sep == 'True'
|
||||
|
||||
self.log.debug('reader.readconfig: field_count %s', self.field_count)
|
||||
self.log.debug('reader.readconfig: trailing_sep %s', self.trailing_sep)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user