Implement --load-from-stdin

This commit is contained in:
Dimitri Fontaine 2010-04-15 15:13:21 +02:00
parent 38c8a079f2
commit cbd167c0e8
8 changed files with 129 additions and 14 deletions

1
debian/changelog vendored
View File

@ -8,6 +8,7 @@ pgloader (2.3.3~dev2-1) unstable; urgency=low
* Have --debug show a traceback
* Fix a bug where pgloader would freeze on early error (no such file)
* Implement an option to set csv field size limit
* Implement --load-from-stdin
-- Dimitri Fontaine <dim@tapoueh.org> Sun, 4 Apr 2010 19:34:39 +0200

View File

@ -120,6 +120,11 @@ columns = *
fixed_specs = a:0:10, b:10:8, c:18:8, d:26:17
reformat = c:pgtime:time
[stdin]
table = stdin
format = csv
columns = *
[csv]
table = csv
format = csv

View File

@ -0,0 +1,8 @@
1|first entry
2|second one
3|another
4|still running
5|well, some more
6|antepenultima
7|next to last
8|hey, it's the last!

4
examples/stdin/stdin.sql Normal file
View File

@ -0,0 +1,4 @@
CREATE TABLE stdin (
a integer,
b text
);

View File

@ -216,6 +216,24 @@ Example: -o standard_conforming_strings=on -o client_encoding=utf8
Force +pgloader+ to use given version of psycopg, either +1+ or
+2+.
== INTERNAL USAGE OPTIONS ==
Those have been developped for internal +pgloader+ usage only, but still
need to be documented. Also, they are maintained and you could find an usage
for them.
--load-from-stdin::
Consider standard input as the data file. When using this function,
either give a section name from which to apply all the setup except for
the +filename+ to load from, or use +--load-to-table+.
--load-to-table::
This option's argument must be the name of the PostgreSQL table you're
loading the data to, it's useful when you want to load from +stdin+ and
avoid editing a full configuration section.
== GLOBAL CONFIGURATION SECTION ==
The configuration file has a +.ini+ file syntax, its first section has

View File

@ -143,6 +143,19 @@ def parse_options():
default = None,
help = "Force usage of given version of psycopg")
parser.add_option("--load-from-stdin", action = "store_true",
dest = "stdin",
default = False,
help = "Load standard input data into given table name")
parser.add_option("--load-to-table", dest = "table",
default = None,
help = "Load to given table when --load-from-stdin")
parser.add_option("--boundaries", dest = "boundaries",
default = None,
help = "Load only in given boundaries, Start..End")
(opts, args) = parser.parse_args()
if opts.version:
@ -214,6 +227,9 @@ def parse_options():
pgloader.options.SECTION_THREADS = opts.section_threads
pgloader.options.MAX_PARALLEL_SECTIONS = opts.parallel
pgloader.options.LOAD_FROM_STDIN = opts.stdin
pgloader.options.LOAD_TO_TABLE = opts.table
if pgloader.options.MAX_PARALLEL_SECTIONS is None:
from pgloader.options import DEFAULT_MAX_PARALLEL_SECTIONS
pgloader.options.MAX_PARALLEL_SECTIONS = DEFAULT_MAX_PARALLEL_SECTIONS
@ -279,6 +295,15 @@ def parse_options():
else:
pgloader.options.PSYCOPG_VERSION = opts.psycopg_version
if opts.boundaries:
try:
start, end = [int(x) for x in opts.boundaries.split("..")]
pgloader.options.FILE_BOUNDARIES = (start, end)
except ValueError, e:
print >>sys.stderr, \
"Error: boundaries should be an integer range written X..Y"
sys.exit(1)
return opts.config, args
def parse_config(conffile):
@ -548,14 +573,43 @@ def load_data():
from pgloader.options import VERBOSE, DEBUG, QUIET, SUMMARY
from pgloader.options import DRY_RUN, PEDANTIC, VACUUM
from pgloader.options import MAX_PARALLEL_SECTIONS
from pgloader.options import LOAD_FROM_STDIN, LOAD_TO_TABLE
from pgloader.options import FILE_BOUNDARIES
from pgloader.pgloader import PGLoader
from pgloader.tools import PGLoader_Error
sections = []
summary = {}
# args are meant to be configuration sections, or filenames
if len(args) > 0:
# args are meant to be configuration sections, or filenames, or stdin
if LOAD_FROM_STDIN:
if FILE_BOUNDARIES is not None:
log.warning("Can't use --boundaries on stdin")
if len(args) == 0:
s = '<stdin>'
config.add_section(s)
config.set(s, 'table', LOAD_TO_TABLE)
config.set(s, 'filename', 'sys.stdin')
config.set(s, 'columns', '*')
config.set(s, 'format', 'csv')
sections.append(s)
elif len(args) == 1:
if config.has_section(args[0]):
# apply given section parameters, then load from stdin
config.set(args[0], 'filename', 'sys.stdin')
sections.append(args[0])
else:
print >>sys.stderr, \
"Error: Please provide a [%s] section" % args[0]
sys.exit(5)
else:
print >>sys.stderr, \
"Error: can't read several sections all from stdin"
sys.exit(5)
elif len(args) > 0:
for s in args:
if config.has_section(s):
sections.append(s)
@ -577,14 +631,21 @@ def load_data():
sections.append(s)
else:
log.debug("No argument on CLI, will consider all sections")
for s in config.sections():
if s != 'pgsql':
sections.append(s)
if not LOAD_FROM_STDIN:
# don't load all sections first when asked to load stdin
log.debug("No argument on CLI, will consider all sections")
for s in config.sections():
if s != 'pgsql':
sections.append(s)
# we run through sorted section list, unless we got the section list
# from command line
sections.sort()
# we run through sorted section list, unless we got the section list
# from command line
sections.sort()
if FILE_BOUNDARIES is not None and len(sections) > 1:
print >>sys.stderr, \
"Error: will not apply boundaries on more than one file"
sys.exit(5)
log.info('Will consider following sections:')
for line in myprint(sections):
@ -634,6 +695,8 @@ def load_data():
if loader:
if not loader.template:
if FILE_BOUNDARIES is not None and len(sections) == 1:
loader.reader.set_boundaries(FILE_BOUNDARIES)
filename = loader.filename
input_encoding = loader.input_encoding
threads[s] = loader

View File

@ -53,3 +53,6 @@ LOG_FILE = None
REJECT_LOG_FILE = '%s.rej.log'
REJECT_DATA_FILE = '%s.rej'
LOAD_FROM_STDIN = None
LOAD_TO_TABLE = None

View File

@ -2,6 +2,7 @@
#
# pgloader data reader interface and defaults
import sys
from tools import PGLoader_Error, Reject, parse_config_string
from db import db
from lo import ifx_clob, ifx_blob
@ -192,9 +193,13 @@ class UnbufferedFileReader:
if self.encoding is not None:
try:
import codecs
self.fd = codecs.open(self.filename,
encoding = self.encoding,
buffering = self.bufsize)
if self.filename == 'sys.stdin':
f = sys.stdin
else:
f = open(self.filename, self.mode, self.bufsize)
self.log.warning('PHOQUE "%s"', f)
self.fd = codecs.getreader(self.encoding)(f)
self.log.info("Opened '%s' with encoding '%s'" \
% (self.filename, self.encoding))
except LookupError, e:
@ -206,7 +211,10 @@ class UnbufferedFileReader:
else:
try:
self.fd = open(self.filename, mode, self.bufsize)
if self.filename == 'sys.stdin':
self.fd = sys.stdin
else:
self.fd = open(self.filename, mode, self.bufsize)
except IOError, error:
raise PGLoader_Error, error
@ -240,7 +248,12 @@ class UnbufferedFileReader:
while line != '':
line = self.fd.readline()
self.line_nb += 1
self.position = self.fd.tell()
try:
self.position = self.fd.tell()
except IOError, error:
#IOError: [Errno 29] Illegal seek --- when stdin reaches EOF
self.log.info(error)
return
##
# if -F is used, count lines to skip, and skip them