Sunday, January 20, 2019

Driving Data Table Merges from the Information Schema

When loading data into multiple tables of a database, it may be necessary to execute UPDATE and INSERT statements for numerous tables. This task can be simplified if the data that are to be loaded are staged in tables that have the same structure as the base tables that are the targets of the data merge, and if the system catalog can be queried to provide information on table structures and keys. The examples below illustrate the construction of a single script to carry out UPDATE and INSERT operations on any table from an equivalently-structured staging table.

These examples are written for Postgres, which represents its system catalog as a set of views in the information_schema schema, compatible with the SQL-92 ANSI standard. These examples also use Postgres' "string_agg()" aggregate function to convert columns of column names to string expressions. Equivalent functionality is available in some other DBMSs (e.g., "group_concat()" in MySQL/MariaDB, and "for xml_path" expressions in Microsoft SQL Server prior to 2017). These examples use the execsql script language to eliminate dependence on any DBMS-specific language extensions.

Some DBMSs support a form of the SQL-standard MERGE (or "upsert") statement, which allows both UPDATE and INSERT operations to be done in a single statement. The first example below illustrates the use of a MERGE statement, and the second example illustrates the use of separate UPDATE and INSERT statements. These examples assume that the new data are staged in a table with the same name as the base table, but in a different schema (e.g., a staging schema). Column names in the base table and staging table must be identical, and types compatible. Base tables may contain some columns that should not be updated using new data, such as autonumber columns and columns that are populated by triggers—these example scripts accept a list of column names that are to be excluded from the merge operation. Table and column names are not quoted in these examples, assuming that the database has been created using the DBMS' naming rules for unquoted identifiers.

Example 1.  Generating a MERGE Statement for Any Table

Postgres uses a non-standard form of the MERGE statement: the INSERT statement supports an ON CONFLICT clause that specifies the action to be taken when there are key conflicts between the base table and the incoming data.

The SQL for the merge statement is generated and executed by an execsql SCRIPT metacommand. The schema names, table name, and list of columns to exclude are specified as execsql substitution variables.


-- ################################################################
--            Script INSERT_UPDATE
--
-- Adds data from a staging table to a base table, using Postgres'
-- INSERT...ON CONFLICT statement.
--
-- Input (global) variables:
--        base_schema     : The name of the base table schema.
--        staging         : The name of the staging schema.
--        table           : The table name--same for base and staging.
--        exclude_cols    : A comma-delimited list of single-quoted
--                          column names identifying the columns
--                          of the base table that are not to be
--                          modified.  These may be autonumber
--                          columns or columns filled by triggers.
--
-- Notes:
--        1. Schema, table, and column names are not quoted.
-- ===============================================================

-- !x! BEGIN SCRIPT INSERT_UPDATE

-- Populate a (temporary) table with the names of the columns
-- in the base table that are to be updated from the staging table
-- (all columns but those in the 'exclude_cols' list).
-- !x! if(is_null("!!exclude_cols!!"))
    -- !x! sub_empty ~col_excl
-- !x! else
    -- !x! sub ~col_excl and column_name not in (!!exclude_cols!!)
-- !x! endif
drop table if exists tt_cols cascade;
select column_name
into temporary table tt_cols
from information_schema.columns
where
    table_schema = '!!base_schema!!'
    and table_name = '!!table!!'
    !!~col_excl!!
order by ordinal_position;


-- Populate a (temporary) table with the names of the primary key
-- columns of the base table.
drop table if exists tt_pks cascade;
select k.column_name
into temporary table tt_pks
from information_schema.table_constraints as tc
inner join information_schema.key_column_usage as k
    on tc.constraint_type = 'PRIMARY KEY' 
    and tc.constraint_name = k.constraint_name
where
    k.table_name = '!!table!!'
    and k.table_schema = '!!base_schema!!'
order by k.ordinal_position;


-- Get all base table columns that are to be updated into a comma-delimited list.
drop view if exists tv_allcollist cascade;
create temporary view tv_allcollist as
select string_agg(column_name, ', ')
from tt_cols;
-- !x! subdata ~allcollist tv_allcollist;


-- Get the primary key columns in a comma-delimited list.
drop view if exists tv_pkcollist cascade;
create temporary view tv_pkcollist as
select string_agg(column_name, ', ')
from tt_pks;
-- !x! subdata ~pkcollist tv_pkcollist;


-- Create a 'set' expression for non-key columns of the base (b) and
-- staging (s) tables.
drop view if exists tv_setexpr cascade;
create temporary view tv_setexpr as
select
    string_agg(column_name || ' = excluded.' || column_name, ', ')
from
    (select column_name from tt_cols
    except select column_name from tt_pks) as nk_cols;
-- !x! subdata ~setexpr tv_setexpr


-- Create the INSERT...ON CONFLICT statement.
-- !x! sub insupd INSERT INTO !!base_schema!!.!!table!! as b (!!~allcollist!!)
-- !x! sub_append insupd SELECT !!~allcollist!! FROM !!staging!!.!!table!! as s
-- !x! sub_append insupd ON CONFLICT (!!~pkcollist!!) DO UPDATE SET !!~setexpr!!

-- Run the generated SQL.
!!insupd!!;


-- !x! END SCRIPT

-- ################################################################

This script can be used by running the following execsql metacommands:

-- !x! sub base_schema  public
-- !x! sub staging      stg_bette
-- !x! sub table        ticketsales
-- !x! sub exclude_cols 'id', 'rev_time', 'rev_user'
-- !x! execute script  insert_update


Example 2. Generating Separate UPDATE and INSERT Statements for Any Table 


A merge statement is convenient for automatic integration of new data into an existing base table, but there are some conditions under which the use of separate UPDATE and INSERT statements may be necessary or desirable. In particular:
  • The DBMS in use doesn't support any form of merge statement.
  • You want to be able to review the data to be modified or inserted before changes are made.
  • You want to log all of the data changes.
This example uses a technique similar to the first example to extract column names from the information schema and to construct and execute SQL. In this case, however, separate UPDATE and INSERT statements are created, and in addition, execsql metacommands are used to display the data changes and prompt the user to approve the data modifications, and to log the changes that are made.


-- ################################################################
--            Script UPSERT_ONE
--
-- Adds data from a staging table to a base table, using UPDATE
-- and INSERT statements.  Displays data to be modified to the
-- user before any modifications are done.  Reports the changes
-- made to the console and optionally to a log file.
--
-- Input (global) variables:
--        base_schema      : The name of the base table schema.
--        staging          : The name of the staging schema.
--        table            : The table name--same for base and staging.
--        exclude_cols     : A comma-delimited list of single-quoted
--                            column names identifying the columns
--                            of the base table that are not to be
--                            modified.  These may be autonumber
--                            columns or columns filled by triggers.
--        display_changes  : A boolean variable indicating whether
--                            or not the changes to be made to the 
--                            base table should be displayed in a GUI.
--                            Optional.  If not defined, the changes
--                            will be defined.
--        display_final    : A boolean variable indicating whether or
--                            not the base table should be displayed
--                            after updates and inserts are completed.
--                            Optional.  If not defined, the final
--                            base table will not be displayed.
--        logfile            : The name of a log file to which update
--                            messages will be written.  Optional.
--        write_sql        : A boolean variable indicating whether
--                            the update and insert statements should
--                            also be written to the logfile.  Optional.
--        write_changes    : A boolean variable indicating whether
--                            the updated and inserted data should be
--                            written to the logfile.  Optional.
--
--    Output (global) variables:
--        updatestmt       : The SQL of the generated UPDATE statement.
--        insertstmt       : The SQL of the generated INSERT statement.
--
-- Notes:
--        1. Schema, table, and column names are not quoted.
-- ===============================================================

-- !x! BEGIN SCRIPT UPSERT_ONE


-- Remove substitution variables that will contain the generated
-- update and insert statements so that the existence of valid
-- statements can be later tested based on the existence of these variables.
-- !x! rm_sub updatestmt
-- !x! rm_sub insertstmt

-- Determine whether or not to display changes.  Updates and
-- inserts will be made by default if changes are not displayed.
-- !x! sub ~disp_changes Yes
-- !x! if(sub_defined(display_changes))
    -- !x! sub ~disp_changes !!display_changes!!
-- !x! endif
-- !x! sub ~do_updates Yes
-- !x! sub ~do_inserts Yes

-- !x! if(sub_defined(logfile))
    -- !x! write "" to !!logfile!!
    -- !x! write "==================================================================" to !!logfile!!
    -- !x! write "!!$current_time!! -- Processing table !!base_schema!!.!!table!!" to !!logfile!!
-- !x! endif

-- Populate a (temporary) table with the names of the columns
-- in the base table that are to be updated from the staging table.
-- !x! if(is_null("!!exclude_cols!!"))
    -- !x! sub_empty ~col_excl
-- !x! else
    -- !x! sub ~col_excl and column_name not in (!!exclude_cols!!)
-- !x! endif
drop table if exists tt_cols cascade;
select column_name
into temporary table tt_cols
from information_schema.columns
where
    table_schema = '!!base_schema!!'
    and table_name = '!!table!!'
    !!~col_excl!!
order by ordinal_position;


-- Populate a (temporary) table with the names of the primary key
-- columns of the base table.
drop table if exists tt_pks cascade;
select k.column_name
into tt_pks
from information_schema.table_constraints as tc
inner join information_schema.key_column_usage as k
    on tc.constraint_type = 'PRIMARY KEY' 
    and tc.constraint_name = k.constraint_name
where
    k.table_name = '!!table!!'
    and k.table_schema = '!!base_schema!!'
order by k.ordinal_position;


-- Get all base table columns that are to be updated into a comma-delimited list.
drop view if exists tv_allcollist cascade;
create temporary view tv_allcollist as
select string_agg(column_name, ', ')
from tt_cols;
-- !x! subdata ~allcollist tv_allcollist;


-- Get all base table columns that are to be updated into a comma-delimited list
-- with a "b." prefix.
drop view if exists tv_allbasecollist cascade;
create temporary view tv_allbasecollist as
select string_agg('b.' || column_name, ', ')
from tt_cols;
-- !x! subdata ~allbasecollist tv_allbasecollist;


-- Get all staging table column names for columns that are to be updated
-- into a comma-delimited list with an "s." prefix.
drop view if exists tv_allstgcollist cascade;
create temporary view tv_allstgcollist as
select string_agg('s.' || column_name, ', ')
from tt_cols;
-- !x! subdata ~allstgcollist tv_allstgcollist;


-- Get the primary key columns in a comma-delimited list.
drop view if exists tv_pkcollist cascade;
create temporary view tv_pkcollist as
select string_agg(column_name, ', ')
from tt_pks;
-- !x! subdata ~pkcollist tv_pkcollist;


-- Create a join expression for key columns of the base (b) and
-- staging (s) tables.
drop view if exists tv_joinexpr cascade;
create temporary view tv_joinexpr as
select
    string_agg('b.' || column_name || ' = s.' || column_name, ' and ')
from
    tt_pks;
-- !x! subdata ~joinexpr tv_joinexpr


-- Create a FROM clause for an inner join between base and staging
-- tables on the primary key column(s).
-- !x! sub ~fromclause FROM !!base_schema!!.!!table!! as b INNER JOIN !!staging!!.!!table!! as s ON !!~joinexpr!!


-- Create SELECT queries to pull all columns with matching keys from both
-- base and staging tables.
drop view if exists tv_basematches cascade;
create temporary view tv_basematches as select !!~allbasecollist!! !!~fromclause!!;

drop view if exists tv_stgmatches cascade;
create temporary view tv_stgmatches as select !!~allstgcollist!! !!~fromclause!!;


-- Prompt user to examine matching data and commit, don't commit, or quit.
-- !x! if(hasrows(tv_stgmatches))
    -- !x! if(is_true(!!~disp_changes!!))
        -- !x! prompt ask "Do you want to make these changes? For table !!table!!, new data are shown in the top table below; existing data are in the lower table." sub ~do_updates compare tv_stgmatches and tv_basematches key (!!~pkcollist!!)
    -- !x! endif

    -- !x! if(is_true(!!~do_updates!!))
        -- Create an assignment expression to update non-key columns of the
        -- base table (un-aliased) from columns of the staging table (as s).
        drop view if exists tv_assexpr cascade;
        create temporary view tv_assexpr as
        with nk as (
            select column_name from tt_cols
            except
            select column_name from tt_pks
            )
        select
            string_agg(column_name || ' = s.' || column_name, ', ')
        from
            nk;
        -- !x! subdata ~assexpr tv_assexpr

        -- Create an UPDATE statement to update the base table with
        -- non-key columns from the staging table.  No semicolon terminating generated SQL.
        -- !x! sub updatestmt UPDATE !!base_schema!!.!!table!! as b SET !!~assexpr!! FROM !!staging!!.!!table!! as s WHERE !!~joinexpr!! 
        -- !x! endif
-- !x! endif


-- Create a select statement to find all rows of the staging table
-- that are not in the base table.
drop view if exists tv_newrows cascade;
create temporary view tv_newrows as
with newpks as (
    select !!~pkcollist!! from !!staging!!.!!table!!
    except
    select !!~pkcollist!! from !!base_schema!!.!!table!!
    )
select
    s.*
from
    !!staging!!.!!table!! as s
    inner join newpks using (!!~pkcollist!!);


-- Prompt user to examine new data and continue or quit.
-- !x! if(hasrows(tv_newrows))
    -- !x! if(is_true(!!~disp_changes!!))
        -- !x! prompt ask "Do you want to add these new data to the !!base_schema!!.!!table!! table?" sub ~do_inserts display tv_newrows
    -- !x! endif

    -- !x! if(is_true(!!~do_inserts!!))
        -- Create an insert statement.  No semicolon terminating generated SQL.
        -- !x! sub insertstmt INSERT INTO !!base_schema!!.!!table!! (!!~allcollist!!) SELECT !!~allcollist!! FROM tv_newrows
    -- !x! endif
-- !x! endif


-- Run the update and insert statements.

-- !x! if(sub_defined(updatestmt))
-- !x! andif(is_true(!!~do_updates!!))
    -- !x! write "Updating !!base_schema!!.!!table!!"
    -- !x! if(sub_defined(logfile))
    -- !x! andif(sub_defined(write_sql))
    -- !x! andif(is_true(!!write_sql!!))
        -- !x! write "" to !!logfile!!
        -- !x! write "------------------------------------------------------------------" to !!logfile!!
        -- !x! write "UPDATE statement for !!base_schema!!.!!table!!:" to !!logfile!!
        -- !x! write [!!updatestmt!!] to !!logfile!!
        -- !x! if(sub_defined(write_changes))
        -- !x! andif(is_true(!!write_changes!!))
            -- !x! write "Updates:" to !!logfile!!
            -- !x! export tv_stgmatches append to !!logfile!! as txt
        -- !x! endif
        -- !x! write "" to !!logfile!!
    -- !x! endif
    !!updatestmt!!;
    -- !x! if(sub_defined(logfile))
        -- !x! write "!!$last_rowcount!! rows of !!base_schema!!.!!table!! updated." to !!logfile!!
    -- !x! endif
    -- !x! write "    !!$last_rowcount!! rows updated."
-- !x! endif


-- !x! if(sub_defined(insertstmt))
-- !x! andif(is_true(!!~do_inserts!!))
    -- !x! write "Adding data to !!base_schema!!.!!table!!"
    -- !x! if(sub_defined(logfile))
    -- !x! andif(sub_defined(write_sql))
    -- !x! andif(is_true(!!write_sql!!))
        -- !x! write "" to !!logfile!!
        -- !x! write "------------------------------------------------------------------" to !!logfile!!
        -- !x! write "INSERT statement for !!base_schema!!.!!table!!:" to !!logfile!!
        -- !x! write [!!insertstmt!!] to !!logfile!!
        -- !x! if(sub_defined(write_changes))
        -- !x! andif(is_true(!!write_changes!!))
            -- !x! write "New data:" to !!logfile!!
            -- !x! export tv_newrows append to !!logfile!! as txt
        -- !x! endif
        -- !x! write "" to !!logfile!!
    -- !x! endif
    !!insertstmt!!;
    -- !x! if(sub_defined(logfile))
        -- !x! write "!!$last_rowcount!! rows added to !!base_schema!!.!!table!!." to !!logfile!!
    -- !x! endif
    -- !x! write "    !!$last_rowcount!! rows added."
-- !x! endif


-- !x! if(sub_defined(display_final))
-- !x! andif(is_true(!!display_final!!))
    -- !x! prompt message "Table !!base_schema!!.!!table!! after updates and inserts." display !!base_schema!!.!!table!!
-- !x! endif


-- !x! END SCRIPT
-- ################################################################

Example 3. Automatically Merging Data for Multiple Tables 


Merge operations may need to be performed on multiple database tables for a single incoming data set. Carrying out this process can be simplified by creating a list of all of the tables to be updated, using the information schema to sort that list of tables into dependency order, and then using one of the merging scripts from Example 1 or Example 2 to modify data in the base tables. This example uses the recursive CTE from http://splinterofthesingularity.blogspot.com/2017/12/ordering-database-tables-by-foreign-key.html to order the tables, and then uses the script from Example 2 to perform the data updates. The table of table names that drives this process includes the following columns:
  • table_name — The name of the table to be updated.
  • exclude_cols — A comma-delimited list of quoted column names identifying the columns that are not to be updated.
  • display_changes — A value of "Yes" or "No" to indicate whether data modifications for the corresponding table should be displayed to the user, allowing him or her to allow or disallow those changes.
  • display_final — A value of "Yes" or "No" to indicate whether the the final data table, after the data merge, should be displayed to the user.

-- ################################################################
--            Script UPSERT_ALL
--
-- Updates multiple base tables with new or revised data from
-- staging tables, using the UPSERT_ONE script.
--
-- Input (global) variables:
--        base_schema      : The name of the base table schema.
--        staging          : The name of the staging schema.
--        tablelist        : The name of a table containing the
--                            following four columns:
--                                table_name    : The name of a table
--                                                  to be updated.
--                                exclude_cols    : A comma-delimited
--                                                    list of single-
--                                                    quoted column
--                                                    names, as required
--                                                    by UPDATE_ANY.
--                                display_changes    : A value of "Yes" or
--                                                    "No" indicating
--                                                    whether the changes
--                                                    for the table should
--                                                    be displayed.
--                                display_final    : A value of "Yes" or
--                                                    "No" indicating
--                                                    whether the final
--                                                    state of the table
--                                                    should be displayed.
--        logfile          : The name of a log file to which update
--                            messages will be written.  Optional.
--        write_sql        : A boolean variable indicating whether
--                            the update and insert statements should
--                            also be written to the logfile.
--
--    Output (global) variables:
--        Inherited from the UPSERT_ONE script; will be valid only for
--        the last table:
--            updatestmt   : The SQL of the generated UPDATE statement.
--            insertstmt   : The SQL of the generated INSERT statement.
---
-- Notes:
--        1. Schema, table, and column names are not quoted, and the
--            database should therefore be designed so that they do
--            not need to be quoted.
-- ===============================================================

-- !x! BEGIN SCRIPT UPSERT_ALL


-- Get a table of all dependencies for the base schema.
drop table if exists tt_dependencies;
create temporary table tt_dependencies as
select 
    tc.table_name as child,
    tu.table_name as parent
from 
    information_schema.table_constraints as tc
    inner join information_schema.constraint_table_usage as tu
        on tu.constraint_name = tc.constraint_name
where 
    tc.constraint_type = 'FOREIGN KEY'
    and tc.table_name <> tu.table_name
    and tc.table_schema = '!!base_schema!!';


-- Create a list of tables in the base schema ordered by dependency.
drop table if exists tt_ordered_tables;
with recursive dep_depth as (
    select
          dep.child,
          dep.parent,
          1 as lvl
    from
        tt_dependencies as dep
    union all
    select
        dep.child,
        dep.parent,
        dd.lvl + 1 as lvl
    from
        dep_depth as dd
        inner join tt_dependencies as dep on dep.parent = dd.child
     )
select
    table_name,
    table_order
into
    temporary table tt_ordered_tables
from (
    select
        dd.parent as table_name,
        max(lvl) as table_order
    from
        dep_depth as dd
    group by
        table_name
    union
    select
        dd.child as table_name,
        max(lvl) + 1 as level
    from
        dep_depth as dd
        left join tt_dependencies as dp on dp.parent = dd.child
    where
        dp.parent is null
    group by
        dd.child
    ) as all_levels;


-- Create a list of the selected tables with ordering information.
drop table if exists tt_proctables;
select
    ot.table_order,
    tl.table_name,
    tl.exclude_cols,
    tl.display_changes,
    tl.display_final,
    False::boolean as processed
into
    tt_proctables
from
    !!tablelist!! as tl
    inner join tt_ordered_tables as ot on ot.table_name = tl.table_name
    ;


-- Create a view returning a single unprocessed table, in order.
drop view if exists tv_toprocess;
create temporary view tv_toprocess as
select table_name, exclude_cols, display_changes, display_final
from tt_proctables
where not processed
order by table_order
limit 1;


-- Process all tables in order.

-- !x! execute script load_tables

-- !x! END SCRIPT

-- ################################################################




-- ################################################################
--        Script LOAD_TABLES
-- ===============================================================

-- !x! BEGIN SCRIPT LOAD_TABLES

-- !x! if(hasrows(tv_toprocess))
    -- !x! select_sub tv_toprocess
    -- Convert data variables to global variables used by the
    -- UPSERT_ONE script.
    -- !x! sub table !!@table_name!!
    -- !x! if(not is_null("!!@exclude_cols!!"))
        -- !x! sub exclude_cols !!@exclude_cols!!
    -- !x! else
        -- !x! sub_empty exclude_cols
    -- !x! endif
    -- !x! sub display_changes !!@display_changes!!
    -- !x! sub display_final !!@display_final!!
    -- !x! execute script upsert_one
    update tt_proctables
    set processed = True
    where table_name = '!!@table_name!!';
    -- !x! execute script load_tables
-- !x! endif

-- !x! END SCRIPT

-- ################################################################


Credits
Thanks to Elizabeth Shea for simplifying the update statement generation in the UPSERT_ONE script.

No comments:

Post a Comment