Saturday, November 19, 2016

A Year and Quarter Data Type for PostgreSQL

For some applications, time periods are needed that represent quarters of the year.  Although quarters could be represented by using an exact date data type and constraining the month and day values (i.e., to the first day of each quarter) or by using date ranges in PostgreSQL, these representations do not allow easy computation of the difference between two dates (in quarters) or the addition of specific number of quarters to a date to produce a new date.

The code below shows the implementation of custom composite data type in Postgres that represents quarters of a year. This data type is named "quarter". The implementation supports addition of a number of quarters to a "quarter" date to produce a new "quarter" date, and supports calculation of the difference between two "quarter" dates. Comparison operators are also defined for use in expressions using "quarter" dates and to allow indexing of data tables by "quarter" values.

The Postgres code to define this data type, and arithmetic and comparison operators, is shown below. Details on the creation of a custom data type and operators can be found in the Postgres documentation.

The definition of the "quarter" data type is simply:

create type quarter as (
 year integer,
 quarter integer
 );

To support addition and subtraction of an integer number of quarters to (or from) a "quarter" data type, a function must be defined to carry out this calculation. The arguments to the function are a "quarter" data type and the number of quarters; the second argument may be either positive or negative.

create or replace function qtr_add(
 qtr quarter,
 addqtr integer)
 returns quarter as
$BODY$
DECLARE
 qtr2 quarter;
BEGIN
 -- Adjust the year
 if addqtr >= 0 then
  qtr2.year := qtr.year + div((qtr.quarter + addqtr-1), 4);
 else
  qtr2.year := qtr.year - div(((4 - qtr.quarter) - addqtr), 4);
 end if;
 -- Adjust the quarter
 qtr2.quarter := mod(qtr.quarter + addqtr - 1, 4) +1;
 if qtr2.quarter < 1 then 
  qtr2.quarter := qtr2.quarter + 4;
 end if;
 --
 return qtr2;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;


Subtracting two "quarter" dates can be carried out in a simple expression, but a function is defined to carry out this calculation because it is used for both the mathematical and comparison operators.

create or replace function qtr_diff(
 qtr1 quarter,
 qtr2 quarter)
 returns integer as
$BODY$
BEGIN
 return 4 * (qtr1.year - qtr2.year) + (qtr1.quarter - qtr2.quarter);
END;
$BODY$
 language plpgsql
 immutable leakproof strict;

To support comparison operators (<, <=, =, <>, >=, and >), a function is defined to compare two "quarter" dates. This function returns -1, 0, or 1 depending on whether its first argument is less than, equal to, or greater than its second argument.

create or replace function quarter_comp(
 qtr1 quarter,
 qtr2 quarter)
 returns integer as
$BODY$
DECLARE
 diff integer;
BEGIN
 diff := qtr_diff(qtr1, qtr2);
 if diff = 0 then
  return 0;
 elsif diff < 1 then
  return -1;
 end if;
 return 1;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;


Each comparison operation is carried out by an individual function that uses the 'quarter_comp()' function.

-- --------------------------------------------------------------------------
-- qtr_lt()
-- --------------------------------------------------------------------------
create or replace function qtr_lt(
 qtr1 quarter,
 qtr2 quarter)
 returns boolean as
$BODY$
BEGIN
 return quarter_comp(qtr1, qtr2) < 0;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;


-- --------------------------------------------------------------------------
-- qtr_lte()
-- --------------------------------------------------------------------------
create or replace function qtr_lte(
 qtr1 quarter,
 qtr2 quarter)
 returns boolean as
$BODY$
BEGIN
 return quarter_comp(qtr1, qtr2) <= 0;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;


-- --------------------------------------------------------------------------
-- qtr_eq()
-- --------------------------------------------------------------------------
create or replace function qtr_eq(
 qtr1 quarter,
 qtr2 quarter)
 returns boolean as
$BODY$
BEGIN
 return quarter_comp(qtr1, qtr2) = 0;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;


-- --------------------------------------------------------------------------
-- qtr_ne()
-- --------------------------------------------------------------------------
create or replace function qtr_ne(
 qtr1 quarter,
 qtr2 quarter)
 returns boolean as
$BODY$
BEGIN
 return quarter_comp(qtr1, qtr2) <> 0;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;


-- --------------------------------------------------------------------------
-- qtr_gte()
-- --------------------------------------------------------------------------
create or replace function qtr_gte(
 qtr1 quarter,
 qtr2 quarter)
 returns boolean as
$BODY$
BEGIN
 return quarter_comp(qtr1, qtr2) >= 0;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;



-- --------------------------------------------------------------------------
-- qtr_gt()
-- --------------------------------------------------------------------------
create or replace function qtr_gt(
 qtr1 quarter,
 qtr2 quarter)
 returns boolean as
$BODY$
BEGIN
 return quarter_comp(qtr1, qtr2) > 0;
END;
$BODY$
 language plpgsql
 immutable leakproof strict;


After the functions to carry out the arithmetic and comparison operations have been defined, the operators themselves can be defined.

-- --------------------------------------------------------------------------
-- +
-- --------------------------------------------------------------------------
create operator +(
 procedure = qtr_add,
 leftarg = quarter,
 rightarg = integer,
 commutator = +);

-- --------------------------------------------------------------------------
-- -
-- --------------------------------------------------------------------------
create operator -(
 procedure = qtr_diff,
 leftarg = quarter,
 rightarg = quarter);


-- --------------------------------------------------------------------------
-- <
-- --------------------------------------------------------------------------
create operator <(
 procedure = qtr_lt,
 leftarg = quarter,
 rightarg = quarter,
 commutator = >,
 negator = >=);

-- --------------------------------------------------------------------------
-- <=
-- --------------------------------------------------------------------------
create operator <=(
 procedure = qtr_lte,
 leftarg = quarter,
 rightarg = quarter,
 commutator = >=,
 negator = >);

-- --------------------------------------------------------------------------
-- =
-- --------------------------------------------------------------------------
create operator =(
 procedure = qtr_eq,
 leftarg = quarter,
 rightarg = quarter,
 commutator = =,
 negator = <>);

-- --------------------------------------------------------------------------
-- <>
-- --------------------------------------------------------------------------
create operator <>(
 procedure = qtr_ne,
 leftarg = quarter,
 rightarg = quarter,
 commutator = <>,
 negator = =);

-- --------------------------------------------------------------------------
-- >
-- --------------------------------------------------------------------------
create operator >(
 procedure = qtr_gt,
 leftarg = quarter,
 rightarg = quarter,
 commutator = <,
 negator = <=);

-- --------------------------------------------------------------------------
-- >=
-- --------------------------------------------------------------------------
create operator >=(
 procedure = qtr_gte,
 leftarg = quarter,
 rightarg = quarter,
 commutator = >=,
 negator = <);


To allow "quarter" dates to be used in indexes, an operator class must be defined.

create operator class quarter_ops default
for type quarter using btree as
   operator 1  <,
   operator 2  <=,
   operator 3  =,
   operator 4  >=,
   operator 5  >,
   function 1  quarter_comp(quarter, quarter);

The same approach could be used to define a custom "month" data type.  The code for a "month" data type would be almost identical, with changes needed only to the data type name used and to the functions 'qtr_add()' and 'qtr_diff()'.

Sunday, May 22, 2016

Using a Makefile with a Database

A makefile is a convenient and efficient way to run a multi-step process that creates or modifies files.  However, when one step of that process is to extract data from a server-based database, make has no inherent ability to determine when the data in the database has last been changed, and therefore, whether the data extraction step needs to be re-run.

The following Python program (db_updated.py) addresses this problem. It updates the modification date of a specific flag file (named db_updated.flag by default), creating the file if it does not already exist. This code is written to be used with a PostgreSQL database, but can be easily modified for other databases.

The program requires that the database contain at least one table with a column that contains a revision date (and time), either for that table or for the database as a whole. Command-line options can be used to specify the table(s) to be checked, and the name of the column to be checked. If neither is specified, a default table name of "auditlog" is used, and a default column name of "rev_time" is used.

Command-line options and arguments are:
db_updated.py [-t table] [-c column] [-u user] server database

Multiple -t options may be specified; the latest revision date of any of the tables will be used to set the modification date of the db_updated.flag file. If a user name is specified, the program will prompt for a password.


db_updated.py:
import argparse
import psycopg2
import getpass
import os
import os.path
import time

FLAG_FILE = 'db_updated.flag'

def get_user(server_name, database_name):
 user_name = raw_input("User name for %s database %s: " % (server_name, database_name))
 return user_name

def get_password(server_name, database_name, user_name):
 passwd = getpass.getpass("Password for user %s on %s database %s: " % (user_name, server_name, database_name))
 return passwd

def clparser():
 desc_msg = "Create or update the  database flag file if the given database has been recently modified."
 parser = argparse.ArgumentParser(description=desc_msg)
 parser.add_argument('-u', '--db_user', dest='db_user', action='store')
 parser.add_argument('-t', '--table', dest='table', action='append')
 parser.add_argument('-c', '--column', dest='column', action='store')
 parser.add_argument('server', action='store')
 parser.add_argument('database', action='store')
 return parser

def main():
 parser = clparser()
 arg = parser.parse_args()
 if not os.path.exists(FLAG_FILE):
  open(FLAG_FILE, 'a').close()
 if arg.db_user:
  db_user = arg.db_user
  pw = get_password(arg.server, arg.database, db_user)
  conn = psycopg2.connect(host=arg.server, database=arg.database, port=5432, user=unicode(db_user), password=unicode(pw))
 else:
  conn = psycopg2.connect(host=arg.server, database=arg.database, port=5432)
 curs = conn.cursor()
 column = arg.column or "rev_time"
 if arg.table:
  tables = 0
  sql = ''
  for t in arg.table:
   if tables > 0:
    sql = sql + " union "
   tables += 1
   sql = sql + "select max(%s) as latest from %s" % (column, t)
  if tables > 1:
   sql = "select max(latest) from (%s) as all_t" % sql
  sql = sql + ";"
 else:
  sql = "select max(%s) from auditlog;" % column
 curs.execute(sql)
 revtime = time.mktime(curs.fetchone()[0].timetuple())
 conn.close()
 os.utime(FLAG_FILE, (revtime, revtime))

main()

This can be used in a makefile like the example below. This example extracts data from a database and then runs an R script to further summarize the data.  The input and output file names for the R script are shown on the command line here, though encoding them in the script itself may be more convenient in many cases than parsing the command line in R.

.PHONY: all

all: db_updated.flag data_summary.csv
 db_updated.py -u user_name server_name db_name

data_summary.csv: data.csv summary_stats.R
 Rscript --vanilla summary_stats.R data.csv data_summary.csv

data.csv: db_updated.flag get_data.sql
 execsql.py -tp -u user_name get_data.sql server_name db_name

db_updated.flag:
 db_updated.py -u user_name server_name db_name

In this example, neither a table name nor a column name are included as arguments to db_updated.py. These may be needed in some circumstances, or the program modified so that the defaults are appropriate for the circumstances in which it is used.

Saturday, April 23, 2016

A Threaded Tkinter Toplevel Console Window

The code below implements a GUI window for output display, such as might be used as a console to display status messages or other information from a running program. It is implemented using a threaded Tkinter Toplevel widget, and is designed to be used in a non-GUI command-line program. The display that it produces looks like this:


The Python code for this GUI console is:
class ConsoleUIError(Exception):
 def __init__(self, msg):
  self.value = msg
 def __repr__(self):
  return ("ConsoleUIError(%r)" % self.value)

class ConsoleUI(object):
 class TkUI(object):
  def __init__(self, kill_event, stop_update_event, msg_queue, status_queue, title=None):
   self.kill_event = kill_event
   self.stop_update_event = stop_update_event
   self.msg_queue = msg_queue
   self.status_queue = status_queue
   import Tkinter as tk1
   import ttk as ttk1
   self.win = tk1.Toplevel()
   self.status_msg = tk1.StringVar()
   self.status_msg.set('')
   self.win.title(title if title else "execsql console")
   console_frame = ttk1.Frame(master=self.win, padding="2 2 2 2")
   console_frame.grid(column=0, row=0, sticky=tk1.NSEW)
   self.textarea = tk1.Text(console_frame, width=100, height=25, wrap='none')
   # Status bar
   statusframe = ttk1.Frame(master=self.win)
   statusbar = ttk1.Label(statusframe, text='', textvariable=self.status_msg, 
    relief=tk1.RIDGE, anchor=tk1.W)
   statusbar.pack(side=tk1.BOTTOM, fill=tk1.X)
   statusframe.grid(column=0, row=1, sticky=tk1.EW)
   # Scrollbars
   vscroll = tk1.Scrollbar(console_frame, orient="vertical", command=self.textarea.yview)
   hscroll = tk1.Scrollbar(console_frame, orient="horizontal", command=self.textarea.xview)
   self.textarea.configure(yscrollcommand=vscroll.set)
   self.textarea.configure(xscrollcommand=hscroll.set)
   self.textarea.grid(column=0, row=0, sticky=tk1.NSEW)
   vscroll.grid(column=1, row=0, sticky=tk1.NS)
   hscroll.grid(column=0, row=2, sticky=tk1.EW)
   # Allow resizing
   self.win.columnconfigure(0, weight=1)
   self.win.rowconfigure(0, weight=1)
   console_frame.columnconfigure(0, weight=1)
   console_frame.rowconfigure(0, weight=1)
   # Kill on window close
   self.win.protocol("WM_DELETE_WINDOW", self.kill)
   # Display and center the window
   self.win.update_idletasks()
   m = re.match("(\d+)x(\d+)([-+]\d+)([-+]\d+)", self.win.geometry())
   wwd = int(m.group(1))
   wht = int(m.group(2))
   swd = self.win.winfo_screenwidth()
   sht = self.win.winfo_screenheight()
   xpos = (swd/2) - (wwd/2)
   ypos = (sht/2) - (wht/2)
   self.win.geometry("%dx%d+%d+%d" % (wwd, wht, xpos, ypos))
   self.win.grab_set()
   self.win._root().withdraw()
   self.update_id = self.win.after(200, self.update)
   self.win.wait_window(self.win)
  def kill(self):
   if self.update_id:
    self.win.after_cancel(self.update_id)
    self.update_id = None
   self.win.destroy()
   self.win.update_idletasks()
  def update(self):
   self.update_id = None
   while not self.msg_queue.empty():
    msg = self.msg_queue.get(False)
    self.textarea.insert('end', msg)
    self.textarea.see('end')
    self.msg_queue.task_done()
   while not self.status_queue.empty():
    msg = self.status_queue.get(False)
    self.status_msg.set(msg)
   if self.kill_event.is_set():
    self.kill()
   else:
    if not self.stop_update_event.is_set():
     self.update_id = self.win.after(200, self.update)
 def __init__(self, title=None):
  self.title = title
  self.msg_queue = Queue.Queue()
  self.status_queue = Queue.Queue()
  self.kill_event = threading.Event()
  self.stop_update_event = threading.Event()
  self.consolethread = None
  self.update_id = None
  # Start the local event loop in a thread.
  def openconsole():
   self.active = True
   self.ui = self.TkUI(self.kill_event, self.stop_update_event, self.msg_queue, 
    self.status_queue, self.title)
   # Deallocate the Tk object here to avoid the "main thread is not in main loop" error.
   self.ui = None
  self.consolethread = threading.Thread(target=openconsole)
  self.consolethread.start()
 def write(self, msg):
  self.active = self.consolethread and self.consolethread.is_alive()
  if not self.active:
   raise ConsoleUIError(msg)
  self.msg_queue.put(msg)
 def write_status(self, msg):
  self.active = self.consolethread and self.consolethread.is_alive()
  if not self.active:
   raise ConsoleUIError(msg)
  self.status_queue.put(msg)
 def deactivate(self):
  self.kill_event.set()
  if self.consolethread and self.consolethread.is_alive():
   self.consolethread.join()
  self.active = False
 def wait_for_user_quit(self):
  self.stop_update_event.set()
  if self.consolethread and self.consolethread.is_alive():
   self.consolethread.join()
  self.active = False


Because it runs in its own thread, this console can be used to display information produced by the main program or even several other separate processes.

The 'write()' method of the ConsoleUI object will write the given text at the end of the console display.  A status message can also be written, separately from the stream of text that is written in the main part of the window.  The console window can be closed directly from the program with the 'deactivate()' method, or the program can pause until the user closes the window by using the 'wait_for_user_quit()' method.

Other Tkinter widgets should not be activated in other threads while this GUI console window is open.

Tuesday, April 12, 2016

Calculating the Median in SQL Using Window Functions

Numerous approaches to calculating the median in SQL have been presented in books and online. Here's another one that is short and simple, and makes use of window functions. This example is specific to PostgreSQL, but could be adapted to any other DBMS that supports window functions. To illustrate this method, I'll use a set of test data in a table named median_test:

create table median_test (id text, value double precision);

insert into median_test 
    (id, value)
select
    id,
    value
from
    (select 
        generate_series(1,100) as num, 
        chr(cast(random()*10 as integer)+65) as id, 
        random() as value
    ) as dd;

To support the median calculation, two columns are added using window functions, one of which is the row number of the ordered values (but doubled), and the other of which is the total number of rows. The window functions allow these to be calculated for each data frame, which is determined here by values of the id column. In this example the additional columns are added in a temporary view, but this could instead be a subquery, a common table expression, or even an actual table.

create or replace temporary view d as
select
    id,
    value,
    row_number() over(partition by id order by value) * 2 as rownum2,
    count(*) over (partition by id) as rows
from 
    median_test;

The median calculation is then carried out by averaging either the two central values when there is an even number of values, or the one central value when there is an odd number of values.

select 
    id,
    avg(value) as median
from
    d
where 
    rownum2 in (rows, rows+1, rows+2)
group by
    id;

When there is an odd number of rows, the single median value will have a value of rownum2 equal to rows+1. When there is an even number of rows, the two central rows will have values of rownum2 equal to either rows or rows+2.