Saturday, December 5, 2020

Sequentially Writing Data to Zip Files with Python

The write() and writestr() methods of the ZipFile class in Python's zipfile library allow the addition of an entire file or a single string as a member file within a zip file.  However, if a large amount of data will be generated dynamically, these methods do not allow separate data items to be written sequentially to a member file within a zip file.

The StreamableZipfile class in the following code snippet provides this missing capability.

import time
import zipfile

class StreamableZipfile(object):
	def __init__(self, zipfile_name, mode='a'):
		# Compression type and level are only available in Python
		# since versions 3.3 and 3.7, respectively.
		self.zf = zipfile.ZipFile(zipfile_name, mode,
				compression=zipfile.ZIP_BZIP2, compresslevel=9)
	def close(self):
		self.zf.close()
	def member_file(self, member_filename):
		# Creates a ZipInfo object (file) within the zipfile
		# and opens it for writing.
		self.current_zinfo = zipfile.ZipInfo(filename=member_filename,
						date_time=time.localtime(time.time())[:6])
		self.current_zinfo.compress_type = self.zf.compression
		self.current_zinfo._compresslevel = self.zf.compresslevel
		self.current_zinfo.file_size = 0
		self.current_handle = self.zf.open(self.current_zinfo, mode='w')
	def close_member(self):
		self.current_handle.close()
	def write(self, str_data):
		# Writes the given text to the currently open member.
		data = str_data.encode("utf-8")
		with self.zf._lock:
			self.current_zinfo.file_size = self.current_zinfo.file_size + len(data)
			self.current_handle.write(data)

This simple implementation does not include any error checking and always compresses the data with the bzip2 algorithm, using the highest compression level.  A more robust and flexible implementation would eliminate these limitations.  Modifications are also needed for versions of Python prior to 3.7.

Use of the StreamableZipfile class is illustrated by the following code, which creates a zip file containing two separate files, where lines are written sequentially to each of the files.

import os

zfname = "Test.zip"
if os.path.isfile(zfname):
	os.remove(zfname)

# Open the streamable zip file and write several lines to a file within it.
zfile = StreamableZipfile(zfname)
zfile.member_file('file1.txt')
zfile.write("This is file 1, line 1\n")
zfile.write("This is file 1, line 2\n")
zfile.write("This is file 1, line 3\n")
zfile.close_member()
zfile.close()


# Open the same zip file and write lines to another file within it.
zfile = StreamableZipfile(zfname)
zfile.member_file('file2.txt')
zfile.write("This is file 2, line 1\n")
zfile.write("This is file 2, line 2\n")
zfile.write("This is file 2, line 3\n")
zfile.close_member()
zfile.close()

Saturday, September 12, 2020

The Three Rules of Budgeting

1. If it's a task that you know how to do, you're going to under-budget it.

2. If it's a task that you don't know how to do, you're going to under-budget it by a lot.

3. The client always follows Rule 2.

Monday, June 15, 2020

Producer-Consumer and Other Algorithms for Import of a CSV File to a Database, in Python

This post presents a comparison of the performance of several different algorithms for import of data from a CSV file into a database. Timings are presented for six different algorithms, run on both PostgreSQL (10.12) and MariaDB (10.3.23).

Import of data into a database is a process that is amenable to application of a producer-consumer algorithm because the steps of reading data from the CSV file and writing data to the database may run at different speeds. Using separate threads to read from the CSV file and write to the database can therefore potentially improve performance over reading and inserting rows one by one, by not requiring the faster process to wait for the slower. In addition, because the Python database API allows multiple rows to be inserted into a database in a single function call (i.e., using the 'executemany()' function), this capability provides an additional opportunity for performance improvement over row-by-row insertion. The algorithms tested here evaluate the effect of producer-consumer algorithms and multi-row insertions, providing a comparison to the simple row-by-row insertion method. The comparative performance of Postgres' COPY command is evaluated also.

The Algorithms 

The following algorithms are evaluated:
  1. Postgres' COPY command. 
  2. Row-by-row reading and writing. 
  3. Buffered reading and writing in a single process. 
  4. A producer-consumer algorithm using a single buffer. 
  5. A producer-consumer algorithm using two buffers and single-row reading. 
  6. A producer-consumer algorithm using two buffers and multi-row reading. 
The row-by-row reading/writing algorithm is the simplest possible method to move the data and requires the least coding. All other algorithms (except Postgres' COPY command) are expected to produce performance that is at least as good as this method.

Buffered reading and writing in a single process reads and writes rows into and from a buffer of specified size. Writing of an entire buffer allows the 'executemany()' function of Python's DB-API to be used, for a possible performance improvement over simple row-by-row reading and writing. Performance of this algorithm is expected to be dependent also on the buffer size used.

The producer-consumer algorithm using a single buffer uses two threads, one of which reads rows from the CSV file and places them into the buffer one by one, and the other of which removes rows from the buffer one by one and inserts them into the database. This is the classic producer-consumer model, of which many examples can be found online.

The first of the double-buffered producer-consumer algorithms uses two buffers. One of the two threads reads rows from the CSV file and places them in a buffer one by one. The other thread writes an entire buffer to the database using the 'executemany()' function. The 'writer' thread controls the use of buffers: whenever it is ready, it locks the buffer used by the 'reader' thread, creates a new empty buffer for the 'reader' thread, releases the lock, and then writes the entire buffer that has just been partially or fully filled by the 'reader' thread.

The second of the double-buffered producer consumer algorithms is similar to the previous one, but it locks and entirely fills the input buffer, rather than filling it row-by-row.

Algorithm Implementation

Python's csv library was used to read the CSV file, for all tests but the first (i.e., the test using Postgres's COPY command). Because the csv library yields empty strings for null inputs, each row read is cleaned to replace empty strings with None before the row is written or placed in the buffer.

Production versions of the producer-consumer algorithms illustrated here should include exception handling in the 'reader' and 'writer' threads. Exception-handling code is omitted from these implementations for brevity and clarity.

Support Classes and Functions

The functions that are used to implement and test the various algorithms have a uniform interface: all of them take arguments identifying the CSV file, the database, and the buffer size to use. (The Postgres COPY command does not use a buffer size specification; although this is configurable, the default is used for these tests.) To accomodate differences in DBMSs, such as the default 'paramstyle' used, and to simplify the dynamic creation of the INSERT statement that is used, custom classes were used to represent the CSV file and database objects.

The CsvFile class automatically opens a file and creates a CSV reader, reads the first row containing column headers, and stores those headers so that they can be used to construct the INSERT statement.

class CsvFile(object):
    def __init__(self, filename):
        self.fn = filename
        self.f = None
        self.open()
        self.rdr = csv.reader(self.f)
        self.headers = next(self.rdr)
    def open(self):
        if self.f is None:
            mode = "rb" if sys.version_info < (3,) else "r"
            self.f = open(self.fn, mode)
    def reader(self):
        return self.rdr
    def close(self):
        self.rdr = None
        self.f.close()
        self.f = None

The Database class and subclasses provide a database connection for each type of DBMS, and a method to construct an INSERT statement for a given CsvFile object, using that DBMS's parameter substitution string.  The conn_info argument is a dictionary containing the host name, user name, and password.

class Database(object):
    def __init__(self, conn_info):
        self.paramstr = '%s'
        self.conn = None
    def insert_sql(self, tablename, csvfile):
        return "insert into %s (%s) values (%s);" % (
                tablename,
                ",".join(csvfile.headers),
                ",".join([self.paramstr] * len(csvfile.headers))
                )

class PgDb(Database):
    def __init__(self, conn_info):
        self.db_type = 'p'
        import psycopg2
        self.paramstr = "%s"
        connstr = "host=%(server)s dbname=%(db)s user=%(user)s password=%(pw)s" % conn_info
        self.conn = psycopg2.connect(connstr)

class MariaDb(Database):
    def __init__(self, conn_info):
        self.db_type = 'm'
        import pymysql
        self.paramstr = "%s"
        self.conn = pymysql.connect(host=conn_info["server"], database=conn_info["db"], port=3306, user=conn_info["user"], password=conn_info["pw"])

CSV Import Functions

All of the following import functions take a CsvFile object and a Database object as their first and second arguments, respectively. Functions that buffer input or output take the buffer size, in number of rows, as their third argument. The 'clean_line()' function used in the import functions converts empty strings to nulls (None in Python).

1. Postgres' COPY command

This implementation takes a CsvFile object as its first argument for consistency with the interface of other functions, but it only uses that object to obtain the underlying filename, and then opens that file directly for use with the 'copy_expert()' function of the psycopg2 library.

    def postgres_copy(csvfile, db):
        curs = db.conn.cursor()
        rf = open(csvfile.fn, "rt")
        # Read and discard headers
        hdrs = rf.readline()
        copy_cmd = "copy copy_test from stdin with (format csv)"
        curs.copy_expert(copy_cmd, rf)

2. Row-by-row reading and writing

This algorithm simply iterates over the rows of the CSV file, reading and writing them one by one.

    def simple_copy(csvfile, db):
        ins_sql = db.insert_sql('copy_test', csvfile)
        curs = db.conn.cursor()
        rdr = csvfile.reader()
        for line in rdr:
            curs.execute(ins_sql, clean_line(line))
        db.conn.commit()

3. Buffered reading and writing in a single process

This algorithm successively fills a buffer with a specified number of rows, and then writes all the rows in a single step using the 'executemany()' method.

    def buffer1_copy(csvfile, db, buflines):
        ins_sql = db.insert_sql('copy_test', csvfile)
        curs = db.conn.cursor()
        rdr = csvfile.reader()
        eof = False
        while True:
            b = []
            for j in range(buflines):
                try:
                    line = next(rdr)
                except StopIteration:
                    eof = True
                else:
                    b.append(clean_line(line))
            if len(b) > 0:
                curs.executemany(ins_sql, b)
            if eof:
                break
        db.conn.commit()

4. A producer-consumer algorithm using a single buffer

This is a classic producer-consumer algorithm, using the Queue class from Python's standard library for simplicity.

    def queue_copy(csvfile, db, q_size):
        ins_sql = db.insert_sql('copy_test', csvfile)
        curs = db.conn.cursor()
        rdr = csvfile.reader()
        buffer = queue.Queue(maxsize=q_size)
        # Have all CSV lines been read?
        read_all = threading.Event()
        def write_to_db():
            while not read_all.is_set() or not buffer.empty():
                line = buffer.get()
                curs.execute(ins_sql, line)
                buffer.task_done()
        def get_from_csv():
            for line in rdr:
                buffer.put(clean_line(line))
            read_all.set()
        writer = threading.Thread(target=write_to_db)
        reader = threading.Thread(target=get_from_csv)
        writer.start()
        reader.start()
        read_all.wait()
        reader.join()
        writer.join()
        db.conn.commit()

5. A producer-consumer algorithm using two buffers and single-row reading

The Queue class cannot be used to implement the double-buffer algorithm, so the Condition class is used instead to coordinate locking of the buffer into which rows are read. The reader thread (function 'get_from_csv()') locks the buffer only long enough to append a single line. The writer thread (function 'write_to_db()') can lock the buffer at any time when it is not empty, so the number of rows written to the database at once may be anywhere between 1 and the maximum size of the buffer.

    def double_buffer_copy(csvfile, db, buf_size):
        ins_sql = db.insert_sql('copy_test', csvfile)
        curs = db.conn.cursor()
        rdr = csvfile.reader()
        buf_lock = threading.Condition()
        read_all = threading.Event()
        # Define an object with a mutable list as a buffer.
        class BufObj(object):
            def __init__(self, buffer):
                self.buffer = buffer
        b = BufObj([])
        def write_to_db():
            while not read_all.is_set() or len(b.buffer) > 0:
                buf_lock.acquire()
                b2 = b.buffer
                b.buffer = []
                buf_lock.notify()
                buf_lock.release()
                curs.executemany(ins_sql, b2)
        def get_from_csv():
            for line in rdr:
                line = clean_line(line)
                buf_lock.acquire()
                while len(b.buffer) == buf_size:
                    buf_lock.wait()
                b.buffer.append(line)
                buf_lock.release()
            read_all.set()
        writer = threading.Thread(target=write_to_db)
        reader = threading.Thread(target=get_from_csv)
        writer.start()
        reader.start()
        read_all.wait()
        reader.join()
        writer.join()
        db.conn.commit()

6. A producer-consumer algorithm using two buffers and multi-row reading

This algorithm is similar to the previous one, except that the reader thread locks the buffer until the buffer is filled or there are no more lines in the CSV file.

    def double_buffer_bulk_copy(csvfile, db, buf_size):
        ins_sql = db.insert_sql('copy_test', csvfile)
        curs = db.conn.cursor()
        rdr = csvfile.reader()
        buf_lock = threading.Condition()
        read_all = threading.Event()
        class BufObj(object):
            def __init__(self, buffer):
                self.buffer = buffer
        b = BufObj([])
        def write_to_db():
            while not read_all.is_set() or len(b.buffer) > 0:
                buf_lock.acquire()
                b2 = b.buffer
                b.buffer = []
                buf_lock.notify()
                buf_lock.release()
                curs.executemany(ins_sql, b2)
        def get_from_csv():
            while not read_all.is_set():
                buf_lock.acquire()
                while len(b.buffer) > 0:
                    buf_lock.wait()
                for j in range(buf_size):
                    try:
                        line = next(rdr)
                    except StopIteration:
                        read_all.set()
                    else:
                        b.buffer.append(clean_line(line))
                buf_lock.release()
        writer = threading.Thread(target=write_to_db)
        reader = threading.Thread(target=get_from_csv)
        writer.start()
        reader.start()
        read_all.wait()
        reader.join()
        writer.join()
        db.conn.commit()

Testing

Tests were conducted using CSV files containing 1,000, 10,000, and 50,000 rows. The target table (and the CSV files) contain columns with character, varchar, date, datetime, time, float, double, boolean, and numeric data types. Some values in the CSV file were null. Text was minimally quoted, and some text values contained embedded double quotes. The average line length was approximately 140 characters. Disk buffers, the memory cache, and the swap file were all cleared before each test. The target database table was dropped and re-created before each test. Both databases used for testing were running on the local machine, to eliminate effects of network transmission time.  Each test was run five times; the average time is reported here.

Buffer Size

Performance of the methods that use buffers for reading or writing (algorithms 3-6) can be expected to depend on the size of the buffer used. The optimum buffer size may further depend on block or buffer sizes used by hard disks, the operating system, and Python itself. The effect of buffer size on performance was evaluated for two algorithms:
  • 3. Buffered reading and writing in a single process.
  • 5. A producer-consumer algorithm using two buffers and single-row reading. 

The following figure shows the times to import a CSV file with 50,000 rows into Postgres for these two algorithms. Both algorithms reach their maximum speed (minimum time) with a buffer size of 1,500 rows.


The following figure shows the times required by algorithm 3 (buffered reading and writing in a single process) to import data files of different sizes with buffers of different sizes into Postgres.

For this algorithm, minimum import times are achieved at buffer sizes of 1,000 to 1,500 rows.  The size of the buffer becomes less important as the file size decreases.

Performance

Performance tests were carried out using buffer sizes of both 500 and 1,500 rows on both DMBSs. The times required to load a CSV file of 50,000 rows are shown in the following table.

Average time / 5 runs (seconds)
AlgorithmBuffer size (rows) PostgresMariaDB
1. Postgres copy0.41
2. Simple row-by-row copy11.9518.86
3. Buffered read/write in one thread50010.348.41
4. Producer-consumer, one buffer50013.3819.87
5. Producer-consumer, two buffers, single-row reading50010.578.78
6. Producer-consumer, two buffers, multi-row reading50010.579.83
3. Buffered read/write in one thread1,50010.177.66
4. Producer-consumer, one buffer1,50013.2621.25
5. Producer-consumer, two buffers, single-row reading1,50010.277.89
6. Producer-consumer, two buffers, multi-row reading1,50010.337.29

Discussion

Although the reading and writing processes of CSV import would seem to be a suitable application for a producer-consumer algorithm, the producer-consumer algorithm using a single queue (algorithm 4) is the slowest of all of the methods tested--even slower than the row-by-row copying method.  The other producer-consumer algorithms perform better, but buffered reading and writing in a single process (algorithm 3) was generally the fastest method for both DBMSs.  Compared to the simple row-by-row copying method, buffered reading and writing can produce reductions in import time of 10% for Postgres and 60% for MariaDB.  (In production environments, where network transmittal time is also a factor, the fractional reductions will be much less in practice.)

The one case in which buffered reading and writing (algorithm 3) is not the fastest is when using a buffer size of 1,500 rows with MariaDB; in this case the producer-consumer algorithm using two buffers and multi-row reading (algorithm 6) was slightly faster.  Additional testing not shown here indicates that using larger buffer sizes with this algorithm does not result in further increases in performance.

The relatively poor performance of the producer-consumer algorithm using a single buffer (queue) is most likely due to the large disparity in speed between the reading and writing process.  Reading from the CSV file is far faster than writing to the database (tests not shown).  As a result, the single buffer is quickly filled by the reader, and thereafter the writing and reading threads alternate, each handling a single row at a time.  Thus this process reduces to a set of operations similar to row-by-row copying, with additional overhead for managing the multiple threads.

These results show that the psycopg2 and pymysql libraries differ in the improvements to be gained by use of the 'executemany()' function.  With psycopg2, the 'executemany()' provides little performance improvement relative to multiple calls to 'execute()'.  With pymysql, however, 'executemany()' provides a notable performance improvement relative to multiple calls to 'execute()'.  This can be seen in the contrasting performance improvements between algorithms 2 and 3, where 'execute()' is used in the former, and 'executemany()' is used in the latter.



Sunday, March 1, 2020

Automated Comparison of Data in Base and Staging Tables

When new data have been received and prepared for merging into existing database tables, a review of the changes to be made is often warranted prior to merging the data.  A set of execsql.py scripts to compare data in base and staging tables is available at https://osdn.net/projects/execsql-compare/.  These scripts use the information_schema views of the database to identify primary key and attribute columns, and to generate the SQL to perform different types of comparisons.  These scripts will therefore work without any customization on any table of any database for any of the supported DBMSs.  Currently supported DBMSs are PostgreSQL, MariaDB/MySQL, and SQL Server.

More extensive documentation, including illustrations of the type of output that can be produced, is available at http://execsql-compare.osdn.io/.

Friday, January 17, 2020

Code Repositories Migrated to OSDN

Because Bitbucket will be dropping support for Mercurial, and because I prefer Mercurial to Git, I am migrating all of my open-source repositories to OSDN.  Documentation that is currently hosted on ReadTheDocs (RTD) will also be hosted on OSDN because RTD cannot clone documentation sources from OSDN.

Projects now hosted on OSDN are:
Documentation can be opened from the project pages or directly using URLs of the form http://<project_name>.osdn.io -- e.g., http://execsql.osdn.io/.

Documentation currently on RTD, and repositories on Bitbucket, will be removed sometime prior to the end of May 2020.