Sunday, April 12, 2015

An alternate Python CSV sniffer

I recently came across a case where the format sniffer in Python's csv module was not satisfactory.  The problem is that the format sniffer always identifies a quote character, even when there is no quote character in the CSV file (e.g., in the case of a tab-delimited file).  By default the format sniffer reports that a double-quote character is used.  This is not a problem if you subsequently use the csv module itself to read the file.  However, I wanted to use the csv module only if the file contained quote characters, and use a different method (the copy_from method of the psycopg module) if the file contained no quote characters.  I could not make this distinction using the csv module's format sniffer, so I wrote the following one to use instead.


import re

class CsvDiagError(Exception):
 def __init__(self, msg):
  self.value = msg
 def __str__(self):
  return self.value

class CsvLine():
 escchar = u"\\"
 def __init__(self, line_text):
  self.text = line_text
  self.delim_counts = {}
  self.item_errors = []  # A list of error messages.
 def __str__(self):
  return u"; ".join([u"Text: <<%s>>" % self.text, \
   u"Delimiter counts: <<%s>>" % ", ".join([u"%s: %d" % (k, self.delim_counts[k]) for k in self.delim_counts.keys()]) ])
 def count_delim(self, delim):
  # If the delimiter is a space, consider multiple spaces to be equivalent
  # to a single delimiter, split on the space(s), and consider the delimiter
  # count to be one fewer than the items returned.
  if delim == u" ":
   self.delim_counts[delim] = max(0, len(re.split(r' +', self.text)) - 1)
  else:
   self.delim_counts[delim] = self.text.count(delim)
 def delim_count(self, delim):
  return self.delim_counts[delim]
 def _well_quoted(self, element, qchar):
  # A well-quoted element has either no quotes, a quote on each end and none
  # in the middle, or quotes on both ends and every internal quote is either
  # doubled or escaped.
  # Returns a tuple of three booleans; the first indicates whether the element is
  # well-quoted, the second indicates whether the quote character is used
  # at all, and the third indicates whether the escape character is used.
  if qchar not in element:
   return (True, False, False)
  if len(element) == 0:
   return (True, False, False)
  if element[0] == qchar and element[-1] == qchar and qchar not in element[1:-1]:
   return (True, True, False)
  # The element has quotes; if it doesn't have one on each end, it is not well-quoted.
  if not (element[0] == qchar and element[-1] == qchar):
   return (False, True, False)
  e = element[1:-1]
  # If there are no quotes left after removing doubled quotes, this is well-quoted.
  if qchar not in e.replace(qchar+qchar, u''):
   return (True, True, False)
  # if there are no quotes left after removing escaped quotes, this is well-quoted.
  if qchar not in e.replace(self.escchar+qchar, u''):
   return (True, True, True)
  return (False, True, False)
 def record_format_error(self, pos_no, errmsg):
  self.item_errors.append(u"%s in position %d." % (errmsg, pos_no))
 def items(self, delim, qchar):
  # Parses the line into a list of items, breaking it at delimiters that are not
  # within quoted stretches.  (This is a almost CSV parser, for valid delim and qchar,
  # except that it does not eliminate quote characters or reduce escaped quotes.)
  self.item_errors = []
  if qchar is None:
   if delim is None:
    return self.text
   else:
    if delim == u" ":
     return re.split(r' +', self.text)
    else:
     return self.text.split(delim)
  elements = []  # The list of items on the line that will be returned.
  eat_multiple_delims = delim == u" "
  # States of the FSM:
  # _IN_QUOTED: An opening quote has been seen, but no closing quote encountered.
  #  Actions / transition:
  #   quote: save char in escape buffer / _ESCAPED
  #   esc_char : save char in escape buffer / _ESCAPED
  #   delimiter: save char in element buffer / _IN_QUOTED
  #   other: save char in element buffer / _IN_QUOTED
  # _ESCAPED: An escape character has been seen while _IN_QUOTED (and is in the escape buffer).
  #  Actions / transitions
  #   quote: save escape buffer in element buffer, empty escape buffer, 
  #    save char in element buffer / _IN_QUOTED
  #   delimiter: save escape buffer in element buffer, empty escape buffer,
  #    save element buffer, empty element buffer / _BETWEEN
  #   other: save escape buffer in element buffer, empty escape buffer,
  #    save char in element buffer / _IN_QUOTED
  # _QUOTE_IN_QUOTED: A quote has been seen while _IN_QUOTED (and is in the escape buffer).
  #  Actions / transitions
  #   quote: save escape buffer in element buffer, empty escape buffer, 
  #    save char in element buffer / _IN_QUOTED
  #   delimiter: save escape buffer in element buffer, empty escape buffer,
  #    save element buffer, empty element buffer / _DELIMITED
  #   other: save escape buffer in element buffer, empty escape buffer,
  #    save char in element buffer / _IN_QUOTED
  #     (An 'other' character in this position represents a bad format:
  #     a quote not followed by another quote or a delimiter.)
  # _IN_UNQUOTED: A non-delimiter, non-quote has been seen.
  #  Actions / transitions
  #   quote: save char in element buffer / _IN_UNQUOTED
  #    (This represents a bad format.)
  #   delimiter: save element buffer, empty element buffer / _DELIMITED
  #   other: save char in element buffer / _IN_UNQUOTED
  # _BETWEEN: Not in an element, and a delimiter not seen.  This is the starting state,
  #   and the state following a closing quote but before a delimiter is seen.
  #  Actions / transition:
  #   quote: save char in element buffer / _IN_QUOTED
  #   delimiter: save element buffer, empty element buffer / _DELIMITED
  #    (The element buffer should be empty, representing a null data item.)
  #   other: save char in element buffer / _IN_UNQUOTED
  # _DELIMITED: A delimiter has been seen while not in a quoted item.
  #  Actions / transition:
  #   quote: save char in element buffer / _IN_QUOTED
  #   delimiter: if eat_multiple: no action / _DELIMITED
  #     if not eat_multiple: save element buffer, empty element buffer / _DELIMITED
  #   other: save char in element buffer / _IN_UNQUOTED
  # At end of line: save escape buffer in element buffer, save element buffer.  For a well-formed
  # line, these should be empty, but they may not be.
  #
  # Define the state constants, which will also be used as indexes into an execution vector.
  _IN_QUOTED, _ESCAPED, _QUOTE_IN_QUOTED, _IN_UNQUOTED, _BETWEEN, _DELIMITED = range(6)
  #
  # Because of Python 2.7's scoping rules:
  # * The escape buffer and current element are defined as mutable objects that will have their
  #  first elements modified, rather than as string variables.  (Python 2.x does not allow
  #  modification of a variable in an enclosing scope that is not the global scope, but
  #  mutable objects like lists can be altered.  Another approach would be to implement this
  #  as a class and use instance variables.)
  # * The action functions return the next state rather than assigning it directly to the 'state' variable.
  esc_buf = [u'']
  current_element = [u'']
  def in_quoted():
   if c == self.escchar:
    esc_buf[0] = c
    return _ESCAPED
   elif c == qchar:
    esc_buf[0] = c
    return _QUOTE_IN_QUOTED
   else:
    current_element[0] += c
    return _IN_QUOTED
  def escaped():
   if c == delim:
    current_element[0] += esc_buf[0]
    esc_buf[0] = u''
    elements.append(current_element[0])
    current_element[0] = u''
    return _BETWEEN
   else:
    current_element[0] += esc_buf[0]
    esc_buf[0] = u''
    current_element[0] += c
    return _IN_QUOTED
  def quote_in_quoted():
   if c == qchar:
    current_element[0] += esc_buf[0]
    esc_buf[0] = u''
    current_element[0] += c
    return _IN_QUOTED
   elif c == delim:
    current_element[0] += esc_buf[0]
    esc_buf[0] = u''
    elements.append(current_element[0])
    current_element[0] = u''
    return _DELIMITED
   else:
    current_element[0] += esc_buf[0]
    esc_buf[0] = u''
    current_element[0] += c
    self.record_format_error(i+1, "Unexpected character following a closing quote")
    return _IN_QUOTED
  def in_unquoted():
   if c == delim:
    elements.append(current_element[0])
    current_element[0] = u''
    return _DELIMITED
   else:
    current_element[0] += c
    return _IN_UNQUOTED
  def between():
   if c == qchar:
    current_element[0] += c
    return _IN_QUOTED
   elif c == delim:
    elements.append(current_element[0])
    current_element[0] = u''
    return _DELIMITED
   else:
    current_element[0] += c
    return _IN_UNQUOTED
  def delimited():
   if c == qchar:
    current_element[0] += c
    return _IN_QUOTED
   elif c == delim:
    if not eat_multiple_delims:
     elements.append(current_element[0])
     current_element[0] = u''
    return _DELIMITED
   else:
    current_element[0] += c
    return _IN_UNQUOTED
  # Functions in the execution vector must be ordered identically to the
  # indexes represented by the state constants.
  exec_vector = [ in_quoted, escaped, quote_in_quoted, in_unquoted, between, delimited ]
  # Set the starting state.
  state = _BETWEEN
  # Process the line of text.
  for i, c in enumerate(self.text):
   state = exec_vector[state]()
  # Process the end-of-line condition.
  if len(esc_buf[0]) > 0:
   current_element[0] += esc_buf[0]
  if len(current_element[0]) > 0:
   elements.append(current_element[0])
  return elements
 def well_quoted_line(self, delim, qchar):
  # Returns a tuple of boolean, int, and boolean, indicating: 1) whether the line is
  # well-quoted, 2) the number of elements for which the quote character is used,
  # and 3) whether the escape character is used.
  wq = [ self._well_quoted(el, qchar) for el in self.items(delim, qchar) ]
  return ( all([b[0] for b in wq]), sum([b[1] for b in wq]), any([b[2] for b in wq]) )


def diagnose_delim(linestream, possible_delimiters=None, possible_quotechars=None):
 # Returns a tuple consisting of the delimiter, quote character, and escape
 # character for quote characters within elements of a line.  All may be None.
 # If the escape character is not None, it will be u"\".
 # Arguments:
 # * linestream: An iterable file-like object with a 'next()' method that returns lines of text
 #  as bytes or unicode.
 # * possible_delimiters: A list of single characters that might be used to separate items on
 #  a line.  If not specified, the default consists of tab, comma, semicolon, and vertical rule.
 #  If a space character is included, multiple space characters will be treated as a single
 #  delimiter--so it's best if there are no missing values on space-delimited lines, though
 #  that is not necessarily a fatal flaw unless there is a very high fraction of missing values.
 # * possible_quotechars: A list of single characters that might be used to quote items on
 #  a line.  If not specified, the default consists of single and double quotes.
 if not possible_delimiters:
  possible_delimiters = [ u"\t", u",", u";", u"|"]
 if not possible_quotechars:
  possible_quotechars = [ u'"', u"'"]
 lines = []
 for i in range(100):
  try:
   ln = linestream.next()
  except StopIteration:
   break
  except:
   raise
  while len(ln) > 0 and ln[-1] in (u"\n", u"\r"):
   ln = ln[:-1]
  if len(ln) > 0:
   lines.append(CsvLine(ln))
 if len(lines) == 0:
  raise CsvDiagError(u"CSV diagnosis error: no lines read")
 for ln in lines:
  for d in possible_delimiters:
   ln.count_delim(d)
 # For each delimiter, find the minimum number of delimiters found on any line, and the number of lines 
 # with that minimum number
 delim_stats = {}
 for d in possible_delimiters:
  dcounts = [ ln.delim_count(d) for ln in lines ]
  min_count = min(dcounts)
  delim_stats[d] = (min_count, dcounts.count(min_count))
 # Remove delimiters that were never found.
 for k in delim_stats.keys():
  if delim_stats[k][0] == 0:
   del(delim_stats[k])
 def all_well_quoted(delim, qchar):
  # Returns a tuple of boolean, int, and boolean indicating: 1) whether the line is
  # well-quoted, 2) the total number of lines and elements for which the quote character
  # is used, and 3) the escape character used.
  wq = [ l.well_quoted_line(delim, qchar) for l in lines ]
  return ( all([b[0] for b in wq]), sum([b[1] for b in wq]), CsvLine.escchar if any([b[2] for b in wq]) else None )
 def eval_quotes(delim):
  # Returns a tuple of the form to be returned by 'diagnose_delim()'.
  ok_quotes = {}
  for q in possible_quotechars:
   allwq = all_well_quoted(delim, q)
   if allwq[0]:
    ok_quotes[q] = (allwq[1], allwq[2])
  if len(ok_quotes) == 0:
   return (delim, None, None) # No quotes, no escapechar
  else:
   max_use = max([ v[0] for v in ok_quotes.values() ])
   if max_use == 0:
    return (delim, None, None)
   # If multiple quote characters have the same usage, return (arbitrarily) the first one.
   for q in ok_quotes.keys():
    if ok_quotes[q][0] == max_use:
     return (delim, q, ok_quotes[q][1])
 if len(delim_stats) == 0:
  # None of the delimiters were found.  Some other delimiter may apply,
  # or the input may contain a single value on each line.
  # Identify possible quote characters.
  return eval_quotes(None)
 else:
  if len(delim_stats) > 1:
   # If one of them is a space, prefer the non-space
   if u" " in delim_stats.keys():
    del(delim_stats[u" "])
  if len(delim_stats) == 1:
   return eval_quotes(delim_stats.keys()[0])
  # Assign weights to the delimiters.  The weight is the square of the minimum number of delimiters
  # on a line times the number of lines with that delimiter.
  delim_wts = {}
  for d in delim_stats.keys():
   delim_wts[d] = delim_stats[d][0]**2 * delim_stats[d][1]
  # Evaluate quote usage for each delimiter, from most heavily weighted to least.
  # Return the first good pair where the quote character is used.
  delim_order = sorted(delim_wts, key=delim_wts.get, reverse=True)
  for d in delim_order:
   quote_check = eval_quotes(d)
   if quote_check[0] and quote_check[1]:
    return quote_check
  # There are no delimiters for which quotes are OK.
  return (delim_order[0], None, None)
 # Should never get here
 raise CsvDiagError(u"CSV diagnosis error: an untested set of conditions are present")