import re
class CsvDiagError(Exception):
def __init__(self, msg):
self.value = msg
def __str__(self):
return self.value
class CsvLine():
escchar = u"\\"
def __init__(self, line_text):
self.text = line_text
self.delim_counts = {}
self.item_errors = [] # A list of error messages.
def __str__(self):
return u"; ".join([u"Text: <<%s>>" % self.text, \
u"Delimiter counts: <<%s>>" % ", ".join([u"%s: %d" % (k, self.delim_counts[k]) for k in self.delim_counts.keys()]) ])
def count_delim(self, delim):
# If the delimiter is a space, consider multiple spaces to be equivalent
# to a single delimiter, split on the space(s), and consider the delimiter
# count to be one fewer than the items returned.
if delim == u" ":
self.delim_counts[delim] = max(0, len(re.split(r' +', self.text)) - 1)
else:
self.delim_counts[delim] = self.text.count(delim)
def delim_count(self, delim):
return self.delim_counts[delim]
def _well_quoted(self, element, qchar):
# A well-quoted element has either no quotes, a quote on each end and none
# in the middle, or quotes on both ends and every internal quote is either
# doubled or escaped.
# Returns a tuple of three booleans; the first indicates whether the element is
# well-quoted, the second indicates whether the quote character is used
# at all, and the third indicates whether the escape character is used.
if qchar not in element:
return (True, False, False)
if len(element) == 0:
return (True, False, False)
if element[0] == qchar and element[-1] == qchar and qchar not in element[1:-1]:
return (True, True, False)
# The element has quotes; if it doesn't have one on each end, it is not well-quoted.
if not (element[0] == qchar and element[-1] == qchar):
return (False, True, False)
e = element[1:-1]
# If there are no quotes left after removing doubled quotes, this is well-quoted.
if qchar not in e.replace(qchar+qchar, u''):
return (True, True, False)
# if there are no quotes left after removing escaped quotes, this is well-quoted.
if qchar not in e.replace(self.escchar+qchar, u''):
return (True, True, True)
return (False, True, False)
def record_format_error(self, pos_no, errmsg):
self.item_errors.append(u"%s in position %d." % (errmsg, pos_no))
def items(self, delim, qchar):
# Parses the line into a list of items, breaking it at delimiters that are not
# within quoted stretches. (This is a almost CSV parser, for valid delim and qchar,
# except that it does not eliminate quote characters or reduce escaped quotes.)
self.item_errors = []
if qchar is None:
if delim is None:
return self.text
else:
if delim == u" ":
return re.split(r' +', self.text)
else:
return self.text.split(delim)
elements = [] # The list of items on the line that will be returned.
eat_multiple_delims = delim == u" "
# States of the FSM:
# _IN_QUOTED: An opening quote has been seen, but no closing quote encountered.
# Actions / transition:
# quote: save char in escape buffer / _ESCAPED
# esc_char : save char in escape buffer / _ESCAPED
# delimiter: save char in element buffer / _IN_QUOTED
# other: save char in element buffer / _IN_QUOTED
# _ESCAPED: An escape character has been seen while _IN_QUOTED (and is in the escape buffer).
# Actions / transitions
# quote: save escape buffer in element buffer, empty escape buffer,
# save char in element buffer / _IN_QUOTED
# delimiter: save escape buffer in element buffer, empty escape buffer,
# save element buffer, empty element buffer / _BETWEEN
# other: save escape buffer in element buffer, empty escape buffer,
# save char in element buffer / _IN_QUOTED
# _QUOTE_IN_QUOTED: A quote has been seen while _IN_QUOTED (and is in the escape buffer).
# Actions / transitions
# quote: save escape buffer in element buffer, empty escape buffer,
# save char in element buffer / _IN_QUOTED
# delimiter: save escape buffer in element buffer, empty escape buffer,
# save element buffer, empty element buffer / _DELIMITED
# other: save escape buffer in element buffer, empty escape buffer,
# save char in element buffer / _IN_QUOTED
# (An 'other' character in this position represents a bad format:
# a quote not followed by another quote or a delimiter.)
# _IN_UNQUOTED: A non-delimiter, non-quote has been seen.
# Actions / transitions
# quote: save char in element buffer / _IN_UNQUOTED
# (This represents a bad format.)
# delimiter: save element buffer, empty element buffer / _DELIMITED
# other: save char in element buffer / _IN_UNQUOTED
# _BETWEEN: Not in an element, and a delimiter not seen. This is the starting state,
# and the state following a closing quote but before a delimiter is seen.
# Actions / transition:
# quote: save char in element buffer / _IN_QUOTED
# delimiter: save element buffer, empty element buffer / _DELIMITED
# (The element buffer should be empty, representing a null data item.)
# other: save char in element buffer / _IN_UNQUOTED
# _DELIMITED: A delimiter has been seen while not in a quoted item.
# Actions / transition:
# quote: save char in element buffer / _IN_QUOTED
# delimiter: if eat_multiple: no action / _DELIMITED
# if not eat_multiple: save element buffer, empty element buffer / _DELIMITED
# other: save char in element buffer / _IN_UNQUOTED
# At end of line: save escape buffer in element buffer, save element buffer. For a well-formed
# line, these should be empty, but they may not be.
#
# Define the state constants, which will also be used as indexes into an execution vector.
_IN_QUOTED, _ESCAPED, _QUOTE_IN_QUOTED, _IN_UNQUOTED, _BETWEEN, _DELIMITED = range(6)
#
# Because of Python 2.7's scoping rules:
# * The escape buffer and current element are defined as mutable objects that will have their
# first elements modified, rather than as string variables. (Python 2.x does not allow
# modification of a variable in an enclosing scope that is not the global scope, but
# mutable objects like lists can be altered. Another approach would be to implement this
# as a class and use instance variables.)
# * The action functions return the next state rather than assigning it directly to the 'state' variable.
esc_buf = [u'']
current_element = [u'']
def in_quoted():
if c == self.escchar:
esc_buf[0] = c
return _ESCAPED
elif c == qchar:
esc_buf[0] = c
return _QUOTE_IN_QUOTED
else:
current_element[0] += c
return _IN_QUOTED
def escaped():
if c == delim:
current_element[0] += esc_buf[0]
esc_buf[0] = u''
elements.append(current_element[0])
current_element[0] = u''
return _BETWEEN
else:
current_element[0] += esc_buf[0]
esc_buf[0] = u''
current_element[0] += c
return _IN_QUOTED
def quote_in_quoted():
if c == qchar:
current_element[0] += esc_buf[0]
esc_buf[0] = u''
current_element[0] += c
return _IN_QUOTED
elif c == delim:
current_element[0] += esc_buf[0]
esc_buf[0] = u''
elements.append(current_element[0])
current_element[0] = u''
return _DELIMITED
else:
current_element[0] += esc_buf[0]
esc_buf[0] = u''
current_element[0] += c
self.record_format_error(i+1, "Unexpected character following a closing quote")
return _IN_QUOTED
def in_unquoted():
if c == delim:
elements.append(current_element[0])
current_element[0] = u''
return _DELIMITED
else:
current_element[0] += c
return _IN_UNQUOTED
def between():
if c == qchar:
current_element[0] += c
return _IN_QUOTED
elif c == delim:
elements.append(current_element[0])
current_element[0] = u''
return _DELIMITED
else:
current_element[0] += c
return _IN_UNQUOTED
def delimited():
if c == qchar:
current_element[0] += c
return _IN_QUOTED
elif c == delim:
if not eat_multiple_delims:
elements.append(current_element[0])
current_element[0] = u''
return _DELIMITED
else:
current_element[0] += c
return _IN_UNQUOTED
# Functions in the execution vector must be ordered identically to the
# indexes represented by the state constants.
exec_vector = [ in_quoted, escaped, quote_in_quoted, in_unquoted, between, delimited ]
# Set the starting state.
state = _BETWEEN
# Process the line of text.
for i, c in enumerate(self.text):
state = exec_vector[state]()
# Process the end-of-line condition.
if len(esc_buf[0]) > 0:
current_element[0] += esc_buf[0]
if len(current_element[0]) > 0:
elements.append(current_element[0])
return elements
def well_quoted_line(self, delim, qchar):
# Returns a tuple of boolean, int, and boolean, indicating: 1) whether the line is
# well-quoted, 2) the number of elements for which the quote character is used,
# and 3) whether the escape character is used.
wq = [ self._well_quoted(el, qchar) for el in self.items(delim, qchar) ]
return ( all([b[0] for b in wq]), sum([b[1] for b in wq]), any([b[2] for b in wq]) )
def diagnose_delim(linestream, possible_delimiters=None, possible_quotechars=None):
# Returns a tuple consisting of the delimiter, quote character, and escape
# character for quote characters within elements of a line. All may be None.
# If the escape character is not None, it will be u"\".
# Arguments:
# * linestream: An iterable file-like object with a 'next()' method that returns lines of text
# as bytes or unicode.
# * possible_delimiters: A list of single characters that might be used to separate items on
# a line. If not specified, the default consists of tab, comma, semicolon, and vertical rule.
# If a space character is included, multiple space characters will be treated as a single
# delimiter--so it's best if there are no missing values on space-delimited lines, though
# that is not necessarily a fatal flaw unless there is a very high fraction of missing values.
# * possible_quotechars: A list of single characters that might be used to quote items on
# a line. If not specified, the default consists of single and double quotes.
if not possible_delimiters:
possible_delimiters = [ u"\t", u",", u";", u"|"]
if not possible_quotechars:
possible_quotechars = [ u'"', u"'"]
lines = []
for i in range(100):
try:
ln = linestream.next()
except StopIteration:
break
except:
raise
while len(ln) > 0 and ln[-1] in (u"\n", u"\r"):
ln = ln[:-1]
if len(ln) > 0:
lines.append(CsvLine(ln))
if len(lines) == 0:
raise CsvDiagError(u"CSV diagnosis error: no lines read")
for ln in lines:
for d in possible_delimiters:
ln.count_delim(d)
# For each delimiter, find the minimum number of delimiters found on any line, and the number of lines
# with that minimum number
delim_stats = {}
for d in possible_delimiters:
dcounts = [ ln.delim_count(d) for ln in lines ]
min_count = min(dcounts)
delim_stats[d] = (min_count, dcounts.count(min_count))
# Remove delimiters that were never found.
for k in delim_stats.keys():
if delim_stats[k][0] == 0:
del(delim_stats[k])
def all_well_quoted(delim, qchar):
# Returns a tuple of boolean, int, and boolean indicating: 1) whether the line is
# well-quoted, 2) the total number of lines and elements for which the quote character
# is used, and 3) the escape character used.
wq = [ l.well_quoted_line(delim, qchar) for l in lines ]
return ( all([b[0] for b in wq]), sum([b[1] for b in wq]), CsvLine.escchar if any([b[2] for b in wq]) else None )
def eval_quotes(delim):
# Returns a tuple of the form to be returned by 'diagnose_delim()'.
ok_quotes = {}
for q in possible_quotechars:
allwq = all_well_quoted(delim, q)
if allwq[0]:
ok_quotes[q] = (allwq[1], allwq[2])
if len(ok_quotes) == 0:
return (delim, None, None) # No quotes, no escapechar
else:
max_use = max([ v[0] for v in ok_quotes.values() ])
if max_use == 0:
return (delim, None, None)
# If multiple quote characters have the same usage, return (arbitrarily) the first one.
for q in ok_quotes.keys():
if ok_quotes[q][0] == max_use:
return (delim, q, ok_quotes[q][1])
if len(delim_stats) == 0:
# None of the delimiters were found. Some other delimiter may apply,
# or the input may contain a single value on each line.
# Identify possible quote characters.
return eval_quotes(None)
else:
if len(delim_stats) > 1:
# If one of them is a space, prefer the non-space
if u" " in delim_stats.keys():
del(delim_stats[u" "])
if len(delim_stats) == 1:
return eval_quotes(delim_stats.keys()[0])
# Assign weights to the delimiters. The weight is the square of the minimum number of delimiters
# on a line times the number of lines with that delimiter.
delim_wts = {}
for d in delim_stats.keys():
delim_wts[d] = delim_stats[d][0]**2 * delim_stats[d][1]
# Evaluate quote usage for each delimiter, from most heavily weighted to least.
# Return the first good pair where the quote character is used.
delim_order = sorted(delim_wts, key=delim_wts.get, reverse=True)
for d in delim_order:
quote_check = eval_quotes(d)
if quote_check[0] and quote_check[1]:
return quote_check
# There are no delimiters for which quotes are OK.
return (delim_order[0], None, None)
# Should never get here
raise CsvDiagError(u"CSV diagnosis error: an untested set of conditions are present")
Sunday, April 12, 2015
An alternate Python CSV sniffer
I recently came across a case where the format sniffer in Python's csv module was not satisfactory. The problem is that the format sniffer always identifies a quote character, even when there is no quote character in the CSV file (e.g., in the case of a tab-delimited file). By default the format sniffer reports that a double-quote character is used. This is not a problem if you subsequently use the csv module itself to read the file. However, I wanted to use the csv module only if the file contained quote characters, and use a different method (the copy_from method of the psycopg module) if the file contained no quote characters. I could not make this distinction using the csv module's format sniffer, so I wrote the following one to use instead.
Labels:
CSV,
finite state machine,
FSM,
Python
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment