mirror of
https://github.com/dimitri/pgloader.git
synced 2025-08-08 07:16:58 +02:00
798 lines
27 KiB
Python
Executable File
798 lines
27 KiB
Python
Executable File
#! /usr/bin/env python
|
|
# Author: Dimitri Fontaine <dim@tapoueh.org>
|
|
|
|
"""
|
|
PostgreSQL data import tool, see included man page.
|
|
"""
|
|
|
|
import os, sys, os.path, time, codecs, logging, threading
|
|
from cStringIO import StringIO
|
|
from tempfile import gettempdir
|
|
|
|
import pgloader.options
|
|
import pgloader.tools
|
|
import pgloader.logger
|
|
from pgloader.tools import PGLoader_Error
|
|
|
|
def parse_options():
|
|
""" Parse given options """
|
|
import ConfigParser
|
|
from optparse import OptionParser
|
|
|
|
usage = "%prog [-c <config_filename>] Section [Section ...]"
|
|
parser = OptionParser(usage = usage)
|
|
|
|
parser.add_option("--version", action = "store_true",
|
|
dest = "version",
|
|
default = False,
|
|
help = "show pgloader version")
|
|
|
|
parser.add_option("-c", "--config", dest = "config",
|
|
default = "pgloader.conf",
|
|
help = "configuration file, defauts to pgloader.conf")
|
|
|
|
parser.add_option("-p", "--pedantic", action = "store_true",
|
|
dest = "pedantic",
|
|
default = False,
|
|
help = "pedantic mode, stop processing on warning")
|
|
|
|
parser.add_option("-d", "--debug", action = "store_true",
|
|
dest = "debug",
|
|
default = False,
|
|
help = "add some debug information (a lot of)")
|
|
|
|
parser.add_option("-v", "--verbose", action = "store_true",
|
|
dest = "verbose",
|
|
default = False,
|
|
help = "be verbose and about processing progress")
|
|
|
|
parser.add_option("-q", "--quiet", action = "store_true",
|
|
dest = "quiet",
|
|
default = False,
|
|
help = "be quiet, only print out errors")
|
|
|
|
parser.add_option("-l", "--level", dest = "loglevel",
|
|
default = None,
|
|
help = "loglevel to use: ERROR, WARNING, INFO, DEBUG")
|
|
|
|
parser.add_option("-L", "--logfile", dest = "logfile",
|
|
default = os.path.join (gettempdir(), "pgloader.log"),
|
|
help = "log file, defauts to $TMPDIR/pgloader.log")
|
|
|
|
parser.add_option("-r", "--reject-log", dest = "reject_log",
|
|
default = None,
|
|
help = "log file for rejected data error messages")
|
|
|
|
parser.add_option("-j", "--reject-data", dest = "reject_data",
|
|
default = None,
|
|
help = "log file for rejected data, bad input data")
|
|
|
|
parser.add_option("-s", "--summary", action = "store_true",
|
|
dest = "summary",
|
|
default = False,
|
|
help = "print a summary")
|
|
|
|
parser.add_option("-n", "--dry-run", action = "store_true",
|
|
dest = "dryrun",
|
|
default = False,
|
|
help = "simulate operations, don't connect to the db")
|
|
|
|
parser.add_option("-T", "--truncate", action = "store_true",
|
|
dest = "truncate",
|
|
default = False,
|
|
help = "truncate tables before importing data")
|
|
|
|
parser.add_option("-D", "--disable-triggers", action = "store_true",
|
|
dest = "triggers",
|
|
default = False,
|
|
help = "Disable triggers before loading, Enable them again after")
|
|
|
|
parser.add_option("-V", "--vacuum", action = "store_true",
|
|
dest = "vacuum",
|
|
default = False,
|
|
help = "vacuum tables after data loading")
|
|
|
|
parser.add_option("-C", "--count", dest = "count",
|
|
default = None, type = "int",
|
|
help = "number of input lines to process")
|
|
|
|
parser.add_option("-F", "--from", dest = "fromcount",
|
|
default = 0, type = "int",
|
|
help = "number of input lines to skip")
|
|
|
|
parser.add_option("-I", "--from-id", dest = "fromid",
|
|
default = None,
|
|
help = "wait for given id on input to begin")
|
|
|
|
parser.add_option("-f", "--field-separator", dest = "fsep",
|
|
default = pgloader.options.FIELD_SEP,
|
|
help = "default field separator to use")
|
|
|
|
parser.add_option("-E", "--encoding", dest = "encoding",
|
|
default = None,
|
|
help = "input files encoding")
|
|
|
|
parser.add_option("-o", "--pg-options", dest = "pg_options", action = "append",
|
|
help = "list of PostgreSQL options you want to SET")
|
|
|
|
parser.add_option("-t", "--section-threads", dest = "section_threads",
|
|
default = pgloader.options.SECTION_THREADS,
|
|
type = "int",
|
|
help = "number of threads used per sections, default is 1")
|
|
|
|
parser.add_option("-m", "--max-parallel-sections", dest = "parallel",
|
|
default = pgloader.options.MAX_PARALLEL_SECTIONS,
|
|
type = "int",
|
|
help = "number of sections to load in parralel, " +\
|
|
"default is 1")
|
|
|
|
parser.add_option("-R", "--reformat_path", dest = "reformat_path",
|
|
default = None,
|
|
help = "PATH where to find reformat python modules")
|
|
|
|
parser.add_option("-1", "--psycopg1", action = "store_true",
|
|
dest = "psycopg1",
|
|
default = False,
|
|
help = "Force usage of psycopg1")
|
|
|
|
parser.add_option("-2", "--psycopg2", action = "store_true",
|
|
dest = "psycopg2",
|
|
default = False,
|
|
help = "Force usage of psycopg2")
|
|
|
|
parser.add_option("--psycopg-version", dest = "psycopg_version",
|
|
default = None,
|
|
help = "Force usage of given version of psycopg")
|
|
|
|
parser.add_option("--load-from-stdin", action = "store_true",
|
|
dest = "stdin",
|
|
default = False,
|
|
help = "Load standard input data into given table name")
|
|
|
|
parser.add_option("--load-to-table", dest = "table",
|
|
default = None,
|
|
help = "Load to given table when --load-from-stdin")
|
|
|
|
parser.add_option("--boundaries", dest = "boundaries",
|
|
default = None,
|
|
help = "Load only in given boundaries, Start..End")
|
|
|
|
(opts, args) = parser.parse_args()
|
|
|
|
if opts.version:
|
|
print "PGLoader version %s" % pgloader.options.PGLOADER_VERSION
|
|
sys.exit(0)
|
|
|
|
# check existence and read ability of config file
|
|
if not os.path.exists(opts.config):
|
|
print >>sys.stderr, \
|
|
"Error: Configuration file %s does not exists" % opts.config
|
|
print >>sys.stderr, parser.format_help()
|
|
sys.exit(1)
|
|
|
|
if not os.access(opts.config, os.R_OK):
|
|
print >>sys.stderr, \
|
|
"Error: Can't read configuration file %s" % opts.config
|
|
print >>sys.stderr, parser.format_help()
|
|
sys.exit(1)
|
|
|
|
if opts.fromcount != 0 and opts.fromid is not None:
|
|
print >>sys.stderr, \
|
|
"Error: Can't set both options fromcount (-F) AND fromid (-I)"
|
|
sys.exit(1)
|
|
|
|
if opts.quiet and (opts.verbose or opts.debug):
|
|
print >>sys.stderr, \
|
|
"Error: Can't be verbose and quiet at the same time!"
|
|
sys.exit(1)
|
|
|
|
psyco_opts = 0
|
|
for opt in [opts.psycopg1, opts.psycopg2, opts.psycopg_version]:
|
|
if opt:
|
|
psyco_opts += 1
|
|
|
|
if psyco_opts > 1:
|
|
print >>sys.stderr, \
|
|
"Error: please use only one of the psycopg options"
|
|
sys.exit(1)
|
|
|
|
if opts.psycopg_version is not None:
|
|
if opts.psycopg_version in ("1", "2"):
|
|
opts.psycopg_version = int(opts.psycopg_version)
|
|
|
|
else:
|
|
print >>sys.stderr, \
|
|
"Error: psycopg_version can only be set to either 1 or 2"
|
|
sys.exit(1)
|
|
|
|
# if debug, then verbose
|
|
if opts.debug:
|
|
opts.verbose = True
|
|
|
|
pgloader.options.DRY_RUN = opts.dryrun
|
|
pgloader.options.DEBUG = opts.debug
|
|
pgloader.options.VERBOSE = opts.verbose
|
|
pgloader.options.QUIET = opts.quiet
|
|
pgloader.options.SUMMARY = opts.summary
|
|
pgloader.options.PEDANTIC = opts.pedantic
|
|
|
|
pgloader.options.TRUNCATE = opts.truncate
|
|
pgloader.options.VACUUM = opts.vacuum
|
|
pgloader.options.TRIGGERS = opts.triggers
|
|
|
|
pgloader.options.COUNT = opts.count
|
|
pgloader.options.FROM_COUNT = opts.fromcount
|
|
pgloader.options.FROM_ID = opts.fromid
|
|
pgloader.options.FIELD_SEP = opts.fsep
|
|
|
|
pgloader.options.SECTION_THREADS = opts.section_threads
|
|
pgloader.options.MAX_PARALLEL_SECTIONS = opts.parallel
|
|
|
|
pgloader.options.LOAD_FROM_STDIN = opts.stdin
|
|
pgloader.options.LOAD_TO_TABLE = opts.table
|
|
|
|
if pgloader.options.MAX_PARALLEL_SECTIONS is None:
|
|
from pgloader.options import DEFAULT_MAX_PARALLEL_SECTIONS
|
|
pgloader.options.MAX_PARALLEL_SECTIONS = DEFAULT_MAX_PARALLEL_SECTIONS
|
|
|
|
pgloader.options.INPUT_ENCODING = opts.encoding
|
|
|
|
if opts.reformat_path:
|
|
pgloader.options.REFORMAT_PATH = opts.reformat_path
|
|
|
|
pgloader.options.LOG_FILE = opts.logfile
|
|
|
|
# reject file names must contain one %s which will get replaced by the
|
|
# section name
|
|
if opts.reject_log:
|
|
try:
|
|
unused = opts.reject_log % "that would be a section name"
|
|
except TypeError, e:
|
|
# TypeError: not all arguments converted during string formatting
|
|
# TypeError: not enough arguments for format string
|
|
print >>sys.stderr, \
|
|
"Error: reject log must contain a '%s' place holder for section name"
|
|
sys.exit(1)
|
|
pgloader.options.REJECT_LOG_FILE = opts.reject_log
|
|
|
|
if opts.reject_data:
|
|
try:
|
|
unused = opts.reject_data % "that would be a section name"
|
|
except TypeError, e:
|
|
# TypeError: not all arguments converted during string formatting
|
|
# TypeError: not enough arguments for format string
|
|
print >>sys.stderr, \
|
|
"Error: reject data must contain a '%s' place holder for section name"
|
|
sys.exit(1)
|
|
pgloader.options.REJECT_DATA_FILE = opts.reject_data
|
|
|
|
if opts.loglevel:
|
|
loglevel = pgloader.logger.level(opts.loglevel)
|
|
pgloader.options.CLIENT_MIN_MESSAGES = loglevel
|
|
elif opts.debug:
|
|
pgloader.options.CLIENT_MIN_MESSAGES = logging.DEBUG
|
|
elif opts.verbose:
|
|
pgloader.options.CLIENT_MIN_MESSAGES = logging.INFO
|
|
elif opts.quiet:
|
|
pgloader.options.CLIENT_MIN_MESSAGES = logging.ERROR
|
|
|
|
if opts.pg_options:
|
|
pgloader.options.PG_OPTIONS = {}
|
|
for o in opts.pg_options:
|
|
try:
|
|
n, v = [x.strip() for x in o.split('=')]
|
|
if v == "":
|
|
raise ValueError
|
|
pgloader.options.PG_OPTIONS[n] = v
|
|
except ValueError, e:
|
|
print >>sys.stderr, \
|
|
"Error: PostgreSQL options must have the form 'name=value'"
|
|
sys.exit(1)
|
|
|
|
if opts.psycopg1:
|
|
pgloader.options.PSYCOPG_VERSION = 1
|
|
elif opts.psycopg2:
|
|
pgloader.options.PSYCOPG_VERSION = 2
|
|
else:
|
|
pgloader.options.PSYCOPG_VERSION = opts.psycopg_version
|
|
|
|
if opts.boundaries:
|
|
if opts.stdin:
|
|
print >>sys.stderr, \
|
|
"Error: You can't set the boundaries of stdin."
|
|
sys.exit(1)
|
|
|
|
try:
|
|
start, end = [int(x) for x in opts.boundaries.split("..")]
|
|
pgloader.options.FILE_BOUNDARIES = (start, end)
|
|
except ValueError, e:
|
|
print >>sys.stderr, \
|
|
"Error: boundaries should be an integer range written X..Y"
|
|
sys.exit(1)
|
|
|
|
return opts.config, args
|
|
|
|
def parse_config(conffile):
|
|
""" Parse the configuration file """
|
|
section = 'pgsql'
|
|
|
|
# Now read pgsql configuration section
|
|
import ConfigParser
|
|
config = ConfigParser.ConfigParser()
|
|
|
|
try:
|
|
config.read(conffile)
|
|
except:
|
|
print >>sys.stderr, "Error: Given file is not a configuration file"
|
|
sys.exit(4)
|
|
|
|
if not config.has_section(section):
|
|
print >>sys.stderr, "Error: Please provide a [%s] section" % section
|
|
sys.exit(5)
|
|
|
|
# load some options
|
|
# this has to be done after command line parsing
|
|
from pgloader.options import DRY_RUN, VERBOSE, DEBUG, PEDANTIC
|
|
from pgloader.options import NULL, EMPTY_STRING
|
|
from pgloader.options import CLIENT_MIN_MESSAGES, LOG_FILE
|
|
from pgloader.options import PG_OPTIONS
|
|
from pgloader.tools import check_dirname
|
|
|
|
# first read the logging configuration
|
|
if not CLIENT_MIN_MESSAGES:
|
|
if config.has_option(section, 'client_min_messages'):
|
|
cmm = config.get(section, 'client_min_messages')
|
|
pgloader.options.CLIENT_MIN_MESSAGES = pgloader.logger.level(cmm)
|
|
else:
|
|
# CLIENT_MIN_MESSAGES has not been set at all
|
|
pgloader.options.CLIENT_MIN_MESSAGES = logging.INFO
|
|
|
|
if config.has_option(section, 'log_min_messages'):
|
|
lmm = config.get(section, 'log_min_messages')
|
|
pgloader.options.LOG_MIN_MESSAGES = pgloader.logger.level(lmm)
|
|
else:
|
|
pgloader.options.LOG_MIN_MESSAGES = logging.INFO
|
|
|
|
if config.has_option(section, 'log_file'):
|
|
# don't overload the command line -L option if given
|
|
if not pgloader.options.LOG_FILE:
|
|
pgloader.options.LOG_FILE = config.get(section, 'log_file')
|
|
|
|
if pgloader.options.LOG_FILE:
|
|
ok, logdir_mesg = check_dirname(pgloader.options.LOG_FILE)
|
|
if not ok:
|
|
# force default setting
|
|
pgloader.options.LOG_FILE = pgloader.options.DEFAULT_LOG_FILE
|
|
|
|
try:
|
|
log = pgloader.logger.init(pgloader.options.CLIENT_MIN_MESSAGES,
|
|
pgloader.options.LOG_MIN_MESSAGES,
|
|
pgloader.options.LOG_FILE)
|
|
except PGLoader_Error, e:
|
|
try:
|
|
log = pgloader.logger.init(pgloader.options.CLIENT_MIN_MESSAGES,
|
|
pgloader.options.LOG_MIN_MESSAGES,
|
|
pgloader.options.DEFAULT_LOG_FILE)
|
|
|
|
log.warning(e)
|
|
log.warning("Using default logfile %s",
|
|
pgloader.options.DEFAULT_LOG_FILE)
|
|
except PGLoader_Error, e:
|
|
print e
|
|
sys.exit(8)
|
|
|
|
pgloader.logger.log = log
|
|
|
|
log.info("Logger initialized")
|
|
if logdir_mesg:
|
|
log.error(logdir_mesg)
|
|
log.error("Default logfile %s has been used instead",
|
|
pgloader.options.LOG_FILE)
|
|
|
|
if config.has_option(section, 'input_encoding'):
|
|
input_encoding = pgloader.tools.parse_config_string(
|
|
config.get(section, 'input_encoding'))
|
|
pgloader.options.INPUT_ENCODING = input_encoding
|
|
|
|
# optionnal global newline_escapes
|
|
if config.has_option(section, 'newline_escapes'):
|
|
setting = pgloader.tools.parse_config_string(
|
|
config.get(section, 'newline_escapes'))
|
|
pgloader.options.NEWLINE_ESCAPES = setting
|
|
|
|
# Then there are null and empty_string optionnal parameters
|
|
# They canbe overriden in specific table configuration
|
|
if config.has_option(section, 'null'):
|
|
pgloader.options.NULL = pgloader.tools.parse_config_string(
|
|
config.get(section, 'null'))
|
|
|
|
if config.has_option(section, 'empty_string'):
|
|
pgloader.options.EMPTY_STRING = pgloader.tools.parse_config_string(
|
|
config.get(section, 'empty_string'))
|
|
|
|
if config.has_option(section, 'reformat_path'):
|
|
# command line value is prefered to config format one
|
|
if not pgloader.options.REFORMAT_PATH:
|
|
rpath = config.get(section, 'reformat_path')
|
|
pgloader.options.REFORMAT_PATH = rpath
|
|
|
|
if config.has_option(section, 'max_parallel_sections'):
|
|
if not pgloader.options.MAX_PARALLEL_SECTIONS:
|
|
mps = config.getint(section, 'max_parallel_sections')
|
|
pgloader.options.MAX_PARALLEL_SECTIONS = mps
|
|
|
|
return config
|
|
|
|
def myprint(l, line_prefix = " ", cols = 78):
|
|
""" pretty print list l elements """
|
|
# some code for pretty print
|
|
lines = []
|
|
|
|
tmp = line_prefix
|
|
for e in l:
|
|
if len(tmp) + len(e) > cols:
|
|
lines.append(tmp)
|
|
tmp = line_prefix
|
|
|
|
if tmp != line_prefix: tmp += " "
|
|
tmp += e
|
|
|
|
lines.append(tmp)
|
|
|
|
return lines
|
|
|
|
def duration_pprint(duration):
|
|
""" pretty print duration (human readable information) """
|
|
if duration > 3600:
|
|
h = int(duration / 3600)
|
|
m = int((duration - 3600 * h) / 60)
|
|
s = duration - 3600 * h - 60 * m + 0.5
|
|
return '%2dh%02dm%03.1f' % (h, m, s)
|
|
|
|
elif duration > 60:
|
|
m = int(duration / 60)
|
|
s = duration - 60 * m
|
|
return ' %02dm%06.3f' % (m, s)
|
|
|
|
else:
|
|
return '%10.3f' % duration
|
|
|
|
def print_summary(dbconn, sections, summary, td):
|
|
""" print a pretty summary """
|
|
from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY
|
|
from pgloader.options import DRY_RUN, PEDANTIC, VACUUM
|
|
from pgloader.pgloader import PGLoader
|
|
|
|
retcode = 0
|
|
|
|
t= 'Table name | duration | size | copy rows | errors '
|
|
_= '===================================================================='
|
|
|
|
tu = te = ts = 0 # total updates, errors, size
|
|
|
|
if False and not DRY_RUN:
|
|
dbconn.reset()
|
|
cursor = dbconn.dbconn.cursor()
|
|
|
|
s_ok = 0
|
|
for s in sections:
|
|
if s not in summary:
|
|
continue
|
|
|
|
s_ok += 1
|
|
if s_ok == 1:
|
|
# print pretty sumary header now
|
|
print
|
|
print t
|
|
print _
|
|
|
|
if summary[s]:
|
|
t, d, u, e = summary[s]
|
|
d = duration_pprint(d)
|
|
else:
|
|
t = s
|
|
d = '%9s ' % '-'
|
|
u = e = 0
|
|
|
|
if False and not DRY_RUN:
|
|
sql = "select pg_total_relation_size(%s), " + \
|
|
"pg_size_pretty(pg_total_relation_size(%s));"
|
|
cursor.execute(sql, [t, t])
|
|
octets, sp = cursor.fetchone()
|
|
ts += octets
|
|
|
|
if sp[5:] == 'bytes': sp = sp[:-5] + ' B'
|
|
else:
|
|
sp = '-'
|
|
|
|
tn = s
|
|
if len(tn) > 18:
|
|
tn = s[0:15] + "..."
|
|
|
|
print '%-18s| %ss | %7s | %10d | %10d' % (tn, d, sp, u, e)
|
|
|
|
tu += u
|
|
te += e
|
|
|
|
if e > 0:
|
|
retcode += 1
|
|
|
|
if s_ok > 1:
|
|
td = duration_pprint(td)
|
|
|
|
# pretty size
|
|
if False and not DRY_RUN:
|
|
cursor.execute("select pg_size_pretty(%s);", [ts])
|
|
[ts] = cursor.fetchone()
|
|
if ts[5:] == 'bytes': ts = ts[:-5] + ' B'
|
|
else:
|
|
ts = '-'
|
|
|
|
print _
|
|
print 'Total | %ss | %7s | %10d | %10d' \
|
|
% (td, ts, tu, te)
|
|
|
|
if False and not DRY_RUN:
|
|
cursor.close()
|
|
|
|
return retcode
|
|
|
|
def load_data():
|
|
""" read option line and configuration file, then process data
|
|
import of given section, or all sections if no section is given on
|
|
command line """
|
|
|
|
# first parse command line options, and set pgloader.options values
|
|
# accordingly
|
|
conffile, args = parse_options()
|
|
|
|
# now init db connection
|
|
config = parse_config(conffile)
|
|
|
|
from pgloader.logger import log
|
|
from pgloader.tools import read_path, check_path
|
|
from pgloader.options import VERBOSE
|
|
|
|
import pgloader.options
|
|
if pgloader.options.REFORMAT_PATH:
|
|
rpath = read_path(pgloader.options.REFORMAT_PATH, log, check = False)
|
|
crpath = check_path(rpath, log)
|
|
else:
|
|
rpath = crpath = None
|
|
|
|
if not crpath:
|
|
if rpath:
|
|
# don't check same path entries twice
|
|
|
|
default_rpath = set(crpath) \
|
|
- set(pgloader.options.DEFAULT_REFORMAT_PATH)
|
|
else:
|
|
default_rpath = pgloader.options.DEFAULT_REFORMAT_PATH
|
|
|
|
pgloader.options.REFORMAT_PATH = check_path(default_rpath, log)
|
|
else:
|
|
pgloader.options.REFORMAT_PATH = rpath
|
|
|
|
log.info('Reformat path is %s', pgloader.options.REFORMAT_PATH)
|
|
|
|
# load some pgloader package modules
|
|
from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY
|
|
from pgloader.options import DRY_RUN, PEDANTIC, VACUUM
|
|
from pgloader.options import MAX_PARALLEL_SECTIONS
|
|
from pgloader.options import LOAD_FROM_STDIN, LOAD_TO_TABLE
|
|
from pgloader.options import FILE_BOUNDARIES
|
|
from pgloader.pgloader import PGLoader
|
|
from pgloader.tools import PGLoader_Error
|
|
|
|
sections = []
|
|
summary = {}
|
|
|
|
# args are meant to be configuration sections, or filenames, or stdin
|
|
if LOAD_FROM_STDIN:
|
|
if FILE_BOUNDARIES is not None:
|
|
log.warning("Can't use --boundaries on stdin")
|
|
|
|
if len(args) == 0:
|
|
s = '<stdin>'
|
|
config.add_section(s)
|
|
config.set(s, 'table', LOAD_TO_TABLE)
|
|
config.set(s, 'filename', 'sys.stdin')
|
|
config.set(s, 'columns', '*')
|
|
config.set(s, 'format', 'csv')
|
|
sections.append(s)
|
|
|
|
elif len(args) == 1:
|
|
if config.has_section(args[0]):
|
|
# apply given section parameters, then load from stdin
|
|
config.set(args[0], 'filename', 'sys.stdin')
|
|
sections.append(args[0])
|
|
else:
|
|
print >>sys.stderr, \
|
|
"Error: Please provide a [%s] section" % args[0]
|
|
sys.exit(5)
|
|
else:
|
|
print >>sys.stderr, \
|
|
"Error: can't read several sections all from stdin"
|
|
sys.exit(5)
|
|
|
|
elif len(args) > 0:
|
|
for s in args:
|
|
if config.has_section(s):
|
|
sections.append(s)
|
|
else:
|
|
log.info("Creating a section for file '%s'" % s)
|
|
# a filename was given, apply [pgsql] defaults
|
|
# set the tablename as the filename sans extension
|
|
# consider columns = *
|
|
if not os.path.exists(s):
|
|
print >>sys.stderr, \
|
|
"Error: '%s' does not exists as a section nor as a file" % s
|
|
sys.exit(2)
|
|
|
|
config.add_section(s)
|
|
config.set(s, 'table', os.path.splitext(os.path.basename(s))[0])
|
|
config.set(s, 'filename', s)
|
|
config.set(s, 'columns', '*')
|
|
config.set(s, 'format', 'csv')
|
|
sections.append(s)
|
|
|
|
else:
|
|
if not LOAD_FROM_STDIN:
|
|
# don't load all sections first when asked to load stdin
|
|
log.debug("No argument on CLI, will consider all sections")
|
|
for s in config.sections():
|
|
if s != 'pgsql':
|
|
sections.append(s)
|
|
|
|
# we run through sorted section list, unless we got the section list
|
|
# from command line
|
|
sections.sort()
|
|
|
|
if FILE_BOUNDARIES is not None and len(sections) > 1:
|
|
print >>sys.stderr, \
|
|
"Error: will not apply boundaries on more than one file"
|
|
sys.exit(5)
|
|
|
|
log.info('Will consider following sections:')
|
|
for line in myprint(sections):
|
|
log.info(line)
|
|
|
|
# we count time passed from now on
|
|
begin = time.time()
|
|
|
|
threads = {}
|
|
started = {}
|
|
finished = {}
|
|
current = 0
|
|
interrupted = False
|
|
|
|
max_running = MAX_PARALLEL_SECTIONS
|
|
if max_running == -1:
|
|
max_running = len(sections)
|
|
|
|
log.info('Will load %d section at a time' % max_running)
|
|
|
|
sem = threading.BoundedSemaphore(max_running)
|
|
|
|
while current < len(sections):
|
|
s = sections[current]
|
|
|
|
try:
|
|
loader = None
|
|
summary [s] = []
|
|
started [s] = threading.Event()
|
|
finished[s] = threading.Event()
|
|
|
|
try:
|
|
loader = PGLoader(s, config, sem,
|
|
(started[s], finished[s]), summary[s])
|
|
except PGLoader_Error, e:
|
|
# could not initialize properly this loader, don't
|
|
# ever wait for it
|
|
started[s] .set()
|
|
finished[s].set()
|
|
log.error(e)
|
|
if DEBUG:
|
|
raise
|
|
|
|
except IOError, e:
|
|
# No space left on device? can't log it
|
|
break
|
|
|
|
if loader:
|
|
if not loader.template:
|
|
if FILE_BOUNDARIES is not None and len(sections) == 1:
|
|
loader.reader.set_boundaries(FILE_BOUNDARIES)
|
|
filename = loader.filename
|
|
input_encoding = loader.input_encoding
|
|
threads[s] = loader
|
|
|
|
# .start() will sem.aquire(), so we won't have more
|
|
# than max_running threads running at any time.
|
|
log.debug("Starting a thread for %s" % s)
|
|
threads[s].start()
|
|
else:
|
|
log.info("Skipping section %s, which is a template" % s)
|
|
|
|
for d in (summary, started, finished):
|
|
d.pop(s)
|
|
|
|
except PGLoader_Error, e:
|
|
if e == '':
|
|
log.error('[%s] Please correct previous errors' % s)
|
|
else:
|
|
log.error('%s' % e)
|
|
|
|
if DEBUG:
|
|
raise
|
|
|
|
if PEDANTIC:
|
|
# was: threads[s].print_stats()
|
|
# but now thread[s] is no more alive
|
|
pass
|
|
|
|
except UnicodeDecodeError, e:
|
|
log.error("can't open '%s' with given input encoding '%s'" \
|
|
% (filename, input_encoding))
|
|
|
|
except KeyboardInterrupt:
|
|
interrupted = True
|
|
log.warning("Aborting on user demand (Interrupt)")
|
|
break
|
|
|
|
except IOError, e:
|
|
# typically, No Space Left On Device, can't report nor continue
|
|
break
|
|
|
|
current += 1
|
|
|
|
# get sure each thread is started, then each one is done
|
|
from pgloader.tools import check_events
|
|
|
|
check_events(started, log, "is running")
|
|
log.info("All threads are started, wait for them to terminate")
|
|
check_events(finished, log, "processing is over")
|
|
|
|
# check whether any thread failed
|
|
for section, loader in threads.iteritems():
|
|
if not loader.success:
|
|
return 1
|
|
|
|
# total duration
|
|
td = time.time() - begin
|
|
|
|
if SUMMARY and not interrupted:
|
|
try:
|
|
print_summary(None, sections, summary, td)
|
|
print
|
|
except PGLoader_Error, e:
|
|
log.error("Can't print summary: %s" % e)
|
|
return 1
|
|
|
|
except KeyboardInterrupt:
|
|
return 1
|
|
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
ret = 1
|
|
try:
|
|
ret = load_data()
|
|
except Exception, e:
|
|
from pgloader.options import DEBUG
|
|
if DEBUG:
|
|
raise
|
|
sys.stderr.write(str(e) + '\n')
|
|
sys.exit(1)
|
|
|
|
except IOError, e:
|
|
sys.stderr.write(str(e) + '\n')
|
|
sys.exit(1)
|
|
|
|
except KeyboardInterrupt, e:
|
|
sys.stderr.write(str(e) + '\n')
|
|
sys.exit(1)
|
|
|
|
sys.exit(ret)
|
|
|