ASYNCHRONOUS TRIGGERS WITH STREAMS

Overview

Asynchronous triggers describes a new  capability of the database enabled by combining Streams and pl/sql procedures with a syntax very similar to triggers. Using Streams to capture the changes within the database, these procedures are invoked as customized apply processing simulating trigger firing actions.  As a result, the user session is able to continue without the delay of performing the trigger actions.

Unlike traditional triggers, asynchronous triggers are created and dropped by a pl/sql API  (async_trigger_packages.sql)  provided as sample code within this page. The when clause and trigger body syntax are consistent with triggers, with following exceptions:

bitand(dml_events, async_trig_pkg.on_insert) = async_trig_pkg.on_insert
bitand(dml_events, async_trig_pkg.on_update) = async_trig_pkg.on_update
bitand(dml_events, async_trig_pkg.on_delete) = async_trig_pkg.on_delete


to identify inserting, updating, deleting events respectively.

Example:

trigger_action := 'IF bitand(dml_events, async_trig_pkg.on_update) = ' ||
                    'async_trig_pkg.on_update THEN ' ||
                    'INSERT INTO u1.account_history values (''UPDATE'', ' ||
                    'SYSTIMESTAMP, old.name, old.balance, new.balance); ' ||
                    ' ELSE ' ||
                    'INSERT INTO u1.account_history values (''DELETE'', ' ||
                    'SYSTIMESTAMP, old.name, old.balance, NULL); ' ||
                    ' END IF; ';

ASSUMPTIONS:

Download Files

The downloadable zip file contains 5 files:

 

Initial Setup

  1. Connect to SYS as SYSDBA and run the async_trigger_setup.sql script

The file async_trigger_setup.sql creates a schema used as both the streams administration and trigger management.  This script creates the schema, TRGADM, and grants the following privileges to TRGADM:

After the creation of the TRGADM schema, the script connects as this user and creates the database tables and types used by the sample code.  The file async_trigger_package.sql  is loaded into the TRGADM schema. 

  1. As the TRGADM user, run the set_up_streams procedure specifying a capture and apply process name.  For this example, the capture name is ACCOUNT_CAPTURE and the apply name is ACCOUNT_APPLY.
connect trgadm/trgadm
exec async_trig_pkg.set_up_streams('ACCOUNT_CAPTURE', 'ACCOUNT_APPLY');

SEED Data

  1. Create a user (U1)  schema for demonstration purposes

Connect SYS as SYSDBA

Grant connect, resource to U1 identified by U1;

  1. Create the test tables (accounts, account_history) in the U1 schema
connect u1/u1
create table accounts(name varchar2(30), balance number);
create table account_history (event_name  varchar2(30),
                              event_time  timestamp,
                              name        varchar2(30),
                              old_balance number,
                              new_balance number);

Demonstration

  1. As the user U1, create 3 asynchronous triggers that fire whenever the account name is ALICE.  The first trigger is an insert trigger for each row, that inserts a row into the account_history table.  The second trigger is an update and delete trigger for each row, that inserts a row into the account_history table with the appropriate DML type.  The third trigger is an insert, update, and delete trigger that fires only when the commit is received.  This trigger inserts the current timestamp into the account_history table.
DECLARE
  trigger_action VARCHAR2(1000);
BEGIN

  -- first, create an insert trigger, for each row
  trigger_action := 'INSERT INTO u1.account_history values (''INSERT'', ' ||
                    'SYSTIMESTAMP, new.name, NULL, new.balance); ';
  async_trig_pkg.create_trigger(trigger_owner => 'U1',
                                trigger_name => 'ACCOUNT_TRIG1',
                                table_owner => 'U1',
                                table_name => 'ACCOUNTS',
                                dml_events => async_trig_pkg.on_insert,
                                for_each_row => TRUE,
                                when_clause => 'new.name = ''ALICE''',
                                action => trigger_action);

  -- then, create an update/delete trigger, for each row
  trigger_action := 'IF bitand(dml_events, async_trig_pkg.on_update) = ' ||
                    'async_trig_pkg.on_update THEN ' ||
                    'INSERT INTO u1.account_history values (''UPDATE'', ' ||
                    'SYSTIMESTAMP, old.name, old.balance, new.balance); ' ||
                    ' ELSE ' ||
                    'INSERT INTO u1.account_history values (''DELETE'', ' ||
                    'SYSTIMESTAMP, old.name, old.balance, NULL); ' ||
                    ' END IF; ';
  async_trig_pkg.create_trigger(trigger_owner => 'U1',
                                trigger_name => 'ACCOUNT_TRIG2',
                                table_owner => 'U1',
                                table_name => 'ACCOUNTS',
                                dml_events => async_trig_pkg.on_update +
                                              async_trig_pkg.on_delete,
                                for_each_row => TRUE,
                                when_clause => 'old.name = ''ALICE''',
                                action => trigger_action);

  -- last, create an insert/update/delete table trigger
  trigger_action := 'INSERT INTO u1.account_history values (''DML'', ' ||
                    'SYSTIMESTAMP, NULL, NULL, NULL); ';
  async_trig_pkg.create_trigger(trigger_owner => 'U1',
                                trigger_name => 'ACCOUNT_TRIG3',
                                table_owner => 'U1',
                                table_name => 'ACCOUNTS',
                                dml_events => async_trig_pkg.on_update +
                                              async_trig_pkg.on_insert +
                                              async_trig_pkg.on_delete,
                                for_each_row => FALSE,
                                action => trigger_action);

END;
/
  1. Do some DML on the ACCOUNTS table
insert into accounts values ('ALICE', 1000);
commit;

update accounts set balance = 2000 where name = 'ALICE';
commit;

delete from accounts where name= 'ALICE';
commit;

insert into accounts values ('BOB', 500);
commit;

  1. Check the ACCOUNT_HISTORY table.  Verify that the appropriate rows are inserted into the ACCOUNT_HISTORY table.  There should be 4 rows. 
select * from account_history;

Remove Configuration

The completely remove the configuration for asynchronous triggers, use the clean_up_streams procedure.

    connect trgadm/trgadm

    exec async_trig_pkg.clean_up_streams;

 

 

Package Documentation

The ASYNC_TRIG_PKG package contains 4 user-callable procedures:  set_up_streams, clean_up_streamscreate_triggerdrop_trigger

procedure set_up_streams(capture_name IN varchar2,
                           apply_name   IN varchar2); 

This procedure is used to initially setup the Streams configuration for asynchronous triggers. It is called once to configure a capture process and apply process used specifically for all asynchronous triggers.  The names for the capture and apply process are VARCHAR2 inputs, and should not match the names of an existing Streams configuration.   

The capture and apply processes for asynchronous triggers are associated with the queue: ASYNC_TRIG_Q owned by TRGADM.  This queue is automatically created as part of the set_up_streams procedure.

 

procedure clean_up_streams;

The clean_up_streams procedure is used to completely remove the Streams configuration for asynchronous triggers.  This procedure drops the capture and apply processes associated with the asynchronous triggers configuration.

 

procedure create_trigger(trigger_owner IN varchar2,
                           trigger_name  IN varchar2,
                           table_owner   IN varchar2,
                           table_name    IN varchar2,
                           dml_events    IN binary_integer,
                           for_each_row  IN boolean DEFAULT TRUE,
                           when_clause   IN varchar2 DEFAULT NULL,
                           action        IN varchar2);

The create_trigger procedure registers the table specified by table_owner.table_name with the capture and apply processes.  In addition, a pl/sql procedure is automatically generated  that matches the criteria specified by the create_trigger parameters: dml_events, for_each_row, and when_clause. 

Streams applies transactions after the user has completed his/her transaction.  DML changes made to the specified table are captured by the Streams capture process and queued for the apply process.  After the commit from the user transaction is received, the transaction is scheduled by Streams. 

The apply process is customized via a dml handler to collect the list of tables modified to pass that information to the commit handler.  In addition, if for_each_row is set to TRUE, the generated trigger procedure is performed.  For triggers where for_each_row is specified as FALSE, the Streams pre-commit handler is called.  The list of modified tables is available via a pl/sql collection to the pre-commit handler, and the registered trigger procedures are performed.

 

procedure drop_trigger(  owner          IN varchar2,
                         trigger_name   IN varchar2);

The drop_trigger procedure unregisters the trigger specified and removes the rules for the associated table from  the capture and apply processes.  The dml handlers registered with the asynchronous trigger apply process are cleared for the associated table as well.   In addition, the generated pl/sql procedures for the specified trigger is dropped.