import re class CsvDiagError(Exception): def __init__(self, msg): self.value = msg def __str__(self): return self.value class CsvLine(): escchar = u"\\" def __init__(self, line_text): self.text = line_text self.delim_counts = {} self.item_errors = [] # A list of error messages. def __str__(self): return u"; ".join([u"Text: <<%s>>" % self.text, \ u"Delimiter counts: <<%s>>" % ", ".join([u"%s: %d" % (k, self.delim_counts[k]) for k in self.delim_counts.keys()]) ]) def count_delim(self, delim): # If the delimiter is a space, consider multiple spaces to be equivalent # to a single delimiter, split on the space(s), and consider the delimiter # count to be one fewer than the items returned. if delim == u" ": self.delim_counts[delim] = max(0, len(re.split(r' +', self.text)) - 1) else: self.delim_counts[delim] = self.text.count(delim) def delim_count(self, delim): return self.delim_counts[delim] def _well_quoted(self, element, qchar): # A well-quoted element has either no quotes, a quote on each end and none # in the middle, or quotes on both ends and every internal quote is either # doubled or escaped. # Returns a tuple of three booleans; the first indicates whether the element is # well-quoted, the second indicates whether the quote character is used # at all, and the third indicates whether the escape character is used. if qchar not in element: return (True, False, False) if len(element) == 0: return (True, False, False) if element[0] == qchar and element[-1] == qchar and qchar not in element[1:-1]: return (True, True, False) # The element has quotes; if it doesn't have one on each end, it is not well-quoted. if not (element[0] == qchar and element[-1] == qchar): return (False, True, False) e = element[1:-1] # If there are no quotes left after removing doubled quotes, this is well-quoted. if qchar not in e.replace(qchar+qchar, u''): return (True, True, False) # if there are no quotes left after removing escaped quotes, this is well-quoted. if qchar not in e.replace(self.escchar+qchar, u''): return (True, True, True) return (False, True, False) def record_format_error(self, pos_no, errmsg): self.item_errors.append(u"%s in position %d." % (errmsg, pos_no)) def items(self, delim, qchar): # Parses the line into a list of items, breaking it at delimiters that are not # within quoted stretches. (This is a almost CSV parser, for valid delim and qchar, # except that it does not eliminate quote characters or reduce escaped quotes.) self.item_errors = [] if qchar is None: if delim is None: return self.text else: if delim == u" ": return re.split(r' +', self.text) else: return self.text.split(delim) elements = [] # The list of items on the line that will be returned. eat_multiple_delims = delim == u" " # States of the FSM: # _IN_QUOTED: An opening quote has been seen, but no closing quote encountered. # Actions / transition: # quote: save char in escape buffer / _ESCAPED # esc_char : save char in escape buffer / _ESCAPED # delimiter: save char in element buffer / _IN_QUOTED # other: save char in element buffer / _IN_QUOTED # _ESCAPED: An escape character has been seen while _IN_QUOTED (and is in the escape buffer). # Actions / transitions # quote: save escape buffer in element buffer, empty escape buffer, # save char in element buffer / _IN_QUOTED # delimiter: save escape buffer in element buffer, empty escape buffer, # save element buffer, empty element buffer / _BETWEEN # other: save escape buffer in element buffer, empty escape buffer, # save char in element buffer / _IN_QUOTED # _QUOTE_IN_QUOTED: A quote has been seen while _IN_QUOTED (and is in the escape buffer). # Actions / transitions # quote: save escape buffer in element buffer, empty escape buffer, # save char in element buffer / _IN_QUOTED # delimiter: save escape buffer in element buffer, empty escape buffer, # save element buffer, empty element buffer / _DELIMITED # other: save escape buffer in element buffer, empty escape buffer, # save char in element buffer / _IN_QUOTED # (An 'other' character in this position represents a bad format: # a quote not followed by another quote or a delimiter.) # _IN_UNQUOTED: A non-delimiter, non-quote has been seen. # Actions / transitions # quote: save char in element buffer / _IN_UNQUOTED # (This represents a bad format.) # delimiter: save element buffer, empty element buffer / _DELIMITED # other: save char in element buffer / _IN_UNQUOTED # _BETWEEN: Not in an element, and a delimiter not seen. This is the starting state, # and the state following a closing quote but before a delimiter is seen. # Actions / transition: # quote: save char in element buffer / _IN_QUOTED # delimiter: save element buffer, empty element buffer / _DELIMITED # (The element buffer should be empty, representing a null data item.) # other: save char in element buffer / _IN_UNQUOTED # _DELIMITED: A delimiter has been seen while not in a quoted item. # Actions / transition: # quote: save char in element buffer / _IN_QUOTED # delimiter: if eat_multiple: no action / _DELIMITED # if not eat_multiple: save element buffer, empty element buffer / _DELIMITED # other: save char in element buffer / _IN_UNQUOTED # At end of line: save escape buffer in element buffer, save element buffer. For a well-formed # line, these should be empty, but they may not be. # # Define the state constants, which will also be used as indexes into an execution vector. _IN_QUOTED, _ESCAPED, _QUOTE_IN_QUOTED, _IN_UNQUOTED, _BETWEEN, _DELIMITED = range(6) # # Because of Python 2.7's scoping rules: # * The escape buffer and current element are defined as mutable objects that will have their # first elements modified, rather than as string variables. (Python 2.x does not allow # modification of a variable in an enclosing scope that is not the global scope, but # mutable objects like lists can be altered. Another approach would be to implement this # as a class and use instance variables.) # * The action functions return the next state rather than assigning it directly to the 'state' variable. esc_buf = [u''] current_element = [u''] def in_quoted(): if c == self.escchar: esc_buf[0] = c return _ESCAPED elif c == qchar: esc_buf[0] = c return _QUOTE_IN_QUOTED else: current_element[0] += c return _IN_QUOTED def escaped(): if c == delim: current_element[0] += esc_buf[0] esc_buf[0] = u'' elements.append(current_element[0]) current_element[0] = u'' return _BETWEEN else: current_element[0] += esc_buf[0] esc_buf[0] = u'' current_element[0] += c return _IN_QUOTED def quote_in_quoted(): if c == qchar: current_element[0] += esc_buf[0] esc_buf[0] = u'' current_element[0] += c return _IN_QUOTED elif c == delim: current_element[0] += esc_buf[0] esc_buf[0] = u'' elements.append(current_element[0]) current_element[0] = u'' return _DELIMITED else: current_element[0] += esc_buf[0] esc_buf[0] = u'' current_element[0] += c self.record_format_error(i+1, "Unexpected character following a closing quote") return _IN_QUOTED def in_unquoted(): if c == delim: elements.append(current_element[0]) current_element[0] = u'' return _DELIMITED else: current_element[0] += c return _IN_UNQUOTED def between(): if c == qchar: current_element[0] += c return _IN_QUOTED elif c == delim: elements.append(current_element[0]) current_element[0] = u'' return _DELIMITED else: current_element[0] += c return _IN_UNQUOTED def delimited(): if c == qchar: current_element[0] += c return _IN_QUOTED elif c == delim: if not eat_multiple_delims: elements.append(current_element[0]) current_element[0] = u'' return _DELIMITED else: current_element[0] += c return _IN_UNQUOTED # Functions in the execution vector must be ordered identically to the # indexes represented by the state constants. exec_vector = [ in_quoted, escaped, quote_in_quoted, in_unquoted, between, delimited ] # Set the starting state. state = _BETWEEN # Process the line of text. for i, c in enumerate(self.text): state = exec_vector[state]() # Process the end-of-line condition. if len(esc_buf[0]) > 0: current_element[0] += esc_buf[0] if len(current_element[0]) > 0: elements.append(current_element[0]) return elements def well_quoted_line(self, delim, qchar): # Returns a tuple of boolean, int, and boolean, indicating: 1) whether the line is # well-quoted, 2) the number of elements for which the quote character is used, # and 3) whether the escape character is used. wq = [ self._well_quoted(el, qchar) for el in self.items(delim, qchar) ] return ( all([b[0] for b in wq]), sum([b[1] for b in wq]), any([b[2] for b in wq]) ) def diagnose_delim(linestream, possible_delimiters=None, possible_quotechars=None): # Returns a tuple consisting of the delimiter, quote character, and escape # character for quote characters within elements of a line. All may be None. # If the escape character is not None, it will be u"\". # Arguments: # * linestream: An iterable file-like object with a 'next()' method that returns lines of text # as bytes or unicode. # * possible_delimiters: A list of single characters that might be used to separate items on # a line. If not specified, the default consists of tab, comma, semicolon, and vertical rule. # If a space character is included, multiple space characters will be treated as a single # delimiter--so it's best if there are no missing values on space-delimited lines, though # that is not necessarily a fatal flaw unless there is a very high fraction of missing values. # * possible_quotechars: A list of single characters that might be used to quote items on # a line. If not specified, the default consists of single and double quotes. if not possible_delimiters: possible_delimiters = [ u"\t", u",", u";", u"|"] if not possible_quotechars: possible_quotechars = [ u'"', u"'"] lines = [] for i in range(100): try: ln = linestream.next() except StopIteration: break except: raise while len(ln) > 0 and ln[-1] in (u"\n", u"\r"): ln = ln[:-1] if len(ln) > 0: lines.append(CsvLine(ln)) if len(lines) == 0: raise CsvDiagError(u"CSV diagnosis error: no lines read") for ln in lines: for d in possible_delimiters: ln.count_delim(d) # For each delimiter, find the minimum number of delimiters found on any line, and the number of lines # with that minimum number delim_stats = {} for d in possible_delimiters: dcounts = [ ln.delim_count(d) for ln in lines ] min_count = min(dcounts) delim_stats[d] = (min_count, dcounts.count(min_count)) # Remove delimiters that were never found. for k in delim_stats.keys(): if delim_stats[k][0] == 0: del(delim_stats[k]) def all_well_quoted(delim, qchar): # Returns a tuple of boolean, int, and boolean indicating: 1) whether the line is # well-quoted, 2) the total number of lines and elements for which the quote character # is used, and 3) the escape character used. wq = [ l.well_quoted_line(delim, qchar) for l in lines ] return ( all([b[0] for b in wq]), sum([b[1] for b in wq]), CsvLine.escchar if any([b[2] for b in wq]) else None ) def eval_quotes(delim): # Returns a tuple of the form to be returned by 'diagnose_delim()'. ok_quotes = {} for q in possible_quotechars: allwq = all_well_quoted(delim, q) if allwq[0]: ok_quotes[q] = (allwq[1], allwq[2]) if len(ok_quotes) == 0: return (delim, None, None) # No quotes, no escapechar else: max_use = max([ v[0] for v in ok_quotes.values() ]) if max_use == 0: return (delim, None, None) # If multiple quote characters have the same usage, return (arbitrarily) the first one. for q in ok_quotes.keys(): if ok_quotes[q][0] == max_use: return (delim, q, ok_quotes[q][1]) if len(delim_stats) == 0: # None of the delimiters were found. Some other delimiter may apply, # or the input may contain a single value on each line. # Identify possible quote characters. return eval_quotes(None) else: if len(delim_stats) > 1: # If one of them is a space, prefer the non-space if u" " in delim_stats.keys(): del(delim_stats[u" "]) if len(delim_stats) == 1: return eval_quotes(delim_stats.keys()[0]) # Assign weights to the delimiters. The weight is the square of the minimum number of delimiters # on a line times the number of lines with that delimiter. delim_wts = {} for d in delim_stats.keys(): delim_wts[d] = delim_stats[d][0]**2 * delim_stats[d][1] # Evaluate quote usage for each delimiter, from most heavily weighted to least. # Return the first good pair where the quote character is used. delim_order = sorted(delim_wts, key=delim_wts.get, reverse=True) for d in delim_order: quote_check = eval_quotes(d) if quote_check[0] and quote_check[1]: return quote_check # There are no delimiters for which quotes are OK. return (delim_order[0], None, None) # Should never get here raise CsvDiagError(u"CSV diagnosis error: an untested set of conditions are present")
Sunday, April 12, 2015
An alternate Python CSV sniffer
I recently came across a case where the format sniffer in Python's csv module was not satisfactory. The problem is that the format sniffer always identifies a quote character, even when there is no quote character in the CSV file (e.g., in the case of a tab-delimited file). By default the format sniffer reports that a double-quote character is used. This is not a problem if you subsequently use the csv module itself to read the file. However, I wanted to use the csv module only if the file contained quote characters, and use a different method (the copy_from method of the psycopg module) if the file contained no quote characters. I could not make this distinction using the csv module's format sniffer, so I wrote the following one to use instead.
Labels:
CSV,
finite state machine,
FSM,
Python
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment