Procedural Replication with Streams

The ability to replicate the invocation or call of a pl/sql procedure to another database, rather than the actual DML changes the procedure performed, is known as procedural replication.   This page describes a method of accomplishing procedural replication in a Streams environment.  Procedural replication is simulated in Streams by creating a PL/SQL wrapper procedure and identifying the procedure to be run in an anonymous block.  The wrapper procedure performs the user procedure to be replicated and generates a DDL statement that will execute the procedure at the target database.  The DDL statement is replicated to the target database rather than the DML generated by the user procedure.  This technique is  most useful for performing large batch updates in Streams using stored procedures.

Overview

Some initial setup of the source and target databases is required to prepare the participating databases for procedural replication.  After this initial configuration is complete, it is a simple matter of granting users privilege to perform procedural replication.  After the initial setup is complete and the appropriate privileges have been granted to the user, the user runs the procedure by invoking the procedure replication wrapper, streams_procedure_replication.execute_call 

The initial setup consists of three parts. This setup is performed once

  1. Create the schema STRPROCADM with the appropriate privileges.  This schema is used to store the procedures and other database objects to implement procedural replication. These objects include a table PROC_REP_TAB used to store the replicated procedure and the procedural replication package used in this implementation.  Click here for detailed instructions on this step
  2. Configure a Streams SCHEMA rule for capture/propagation/apply to replicate the DDL issued for this schema (STPROCADM).   At the target database, register an apply process DDL_HANDLER.  This handler will perform special handling for the procedure STREAMS_REPLICATE_PROCEDURES owned by STRPROCADM.  Click here for detailed instructions on this step
  3.  Create a database 'AFTER CREATE' trigger.  This trigger enforces that the procedure execution at the source database and the generated DDL for replication are performed within the same transaction.  Click here for detailed instructions on this step

To allow a user access to procedural replication, privileges must be granted.  These two steps must be done for each procedure.

  1. Grant execute to STRPROCADM on the user procedure  Click here for detailed instructions on this step
  2. Grant the user  execute privilege on the procedural replication wrapper package, streams_procedural_replication  Click here for detailed instructions on this step

To initiate procedural replication, issue the following as USER1:

Assume that the schema USER1 exists and that the procedure mybulkdml exists with the signature MYBULKDML(schemaname,tablename).  Further assume that Streams replicates DML for the USER1 schema.

exec strprocadm.streams_procedural_replication.execute_call('BEGIN  mybulkdml(''USER1'',''CUST''); END;');

or

declare
sql_txt varchar2(4000);
begin
  sql_txt := 'begin user1.mybulkdml(''USER1'', ''CUST'');end;';
  strprocadm.streams_procedural_replication.execute_call(sql_txt);
end;
/

Restrictions  on Procedural Replication:     

Notes:

 

Detailed Instructions

This section contains step-by-step instructions to implement procedural replication.  Following this section is a  worked example (sql scripts and associated logfiles) that can be used to demonstrate this capability.

1.  Create user STRPROCADM and required database objects

CREATE USER strprocadm IDENTIFIED BY strprocadm;
GRANT CONNECT, RESOURCE TO strprocadm;
GRANT create procedure TO strprocadm;         

CREATE USER strprocadm IDENTIFIED BY strprocadm;
GRANT CONNECT, RESOURCE TO strprocadm;

-- This table is created to store information on the user procedure to be replicated.

CREATE TABLE proc_rep_tab (anon_block VARCHAR2(4000));

-- for security reasons, this should be a definer's rights package
CREATE OR REPLACE PACKAGE streams_procedural_replication AS

  -- body of DDL trigger created by system
  PROCEDURE trigger_body;

  -- does local and remote invokation of anonymous block
  PROCEDURE execute_call(anonymous_block IN VARCHAR2);

END;
/

CREATE OR REPLACE PACKAGE BODY STREAMS_PROCEDURAL_REPLICATION as

  -- private procedure to store procedure in table
  -- only one stored procedure can be run at a time, otherwise
  -- and error is raised.  
  PROCEDURE store_call(my_proc_call IN VARCHAR2) AS
  cnt NUMBER;
  BEGIN
    -- This proc_rep_tab table is shared by the users in the database for 
    -- procedure replication. This workaround only allows one user at a 
    -- time to do procedure replication. The table is locked for 
    -- synconization propose.
    LOCK TABLE strprocadm.proc_rep_tab IN EXCLUSIVE MODE;

    SELECT count(*) into cnt FROM proc_rep_tab;
    IF cnt >0 THEN
      RAISE_APPLICATION_ERROR(-1001, 'More than one row in PROC_REP_TAB');
    END IF;

    -- Populate the proc_rep_tab table, procedure info is retrived
    -- during trigger execution by procedure call: trigger_body().
    INSERT INTO strprocadm.proc_rep_tab(anon_block) values (my_proc_call);
    COMMIT;
  EXCEPTION WHEN OTHERS THEN
    -- If errors occur, make sure to clean up proc_rep_tab
    DELETE from strprocadm.proc_rep_tab;
    COMMIT;
    RAISE;
  END;

  PROCEDURE trigger_body AS
  my_anon_block VARCHAR2(4000);
  BEGIN
    -- Before executing the user procedure, turn off capture so that
    -- DMLs on the local table are not captured.
    dbms_streams.set_tag('FF');

    -- Execute the user procedure locally
    select anon_block into my_anon_block from strprocadm.proc_rep_tab;
    execute immediate my_anon_block;
    -- Turn on capture.
    dbms_streams.set_tag;

  EXCEPTION WHEN OTHERS THEN

    -- IF errors occur, make sure the capture is back on.
    dbms_streams.set_tag;
    RAISE;
  END;

  PROCEDURE execute_call(anonymous_block IN VARCHAR2) AS
  procedure_sql  varchar2(4000);
  BEGIN

   -- Populate the table for storing proceudre info.
   store_call(anonymous_block);

   -- Create the well-known procedure to simulate procedure replication.
   procedure_sql := 'create or replace procedure streams_replicate_procedures as'
          ||' begin '
            ||anonymous_block
          ||' end;';
   execute immediate procedure_sql;

   -- clean-up proc_rep_tab table
   DELETE from strprocadm.proc_rep_tab;
   commit;
  EXCEPTION WHEN OTHERS THEN

    -- If errors occur, make sure to clean up proc_rep_tab
    DELETE from strprocadm.proc_rep_tab;
    commit;
    RAISE;
  END;
END STREAMS_PROCEDURAL_REPLICATION;
/

.
GRANT EXECUTE ON STRPROCADM.STREAMS_PROCEDURAL_REPLICATION TO SYSTEM;

2.  Configure Streams for STPROCADM.

As the Streams Administrator (STRADM), add a Streams rule for the schema STRPROCADM to capture/propagation/apply processes for DDL replication only. In the example below, inst1 and site1 both refer to the source database while inst2 and site2 refer to the target database.  Note that the INCLUDE_DML is set to FALSE and INCLUDE_DDL is set to TRUE.  Both of these are set to non-default values.  Site1 and Site2 should reflect the global_name of the individual databases.  Modify the PL/SQL below to identify the correct streams_name and queue_name  for your configuration

BEGIN
 DBMS_STREAMS_ADM.ADD_SCHEMA_RULES(
   SCHEMA_NAME     => 'strprocadm',
   STREAMS_TYPE    => 'APPLY',
   STREAMS_NAME    => 'apply_from_inst1',
   QUEUE_NAME      => 'stradm.STREAMS_QUEUE',
   INCLUDE_DML     =>  FALSE,
   INCLUDE_DDL     =>  TRUE,
   SOURCE_DATABASE =>  :site1);
END;
/
BEGIN
 DBMS_STREAMS_ADM.ADD_SCHEMA_PROPAGATION_RULES(
   SCHEMA_NAME             => 'strprocadm',
   STREAMS_NAME            => 'INST1_TO_INST2',
   SOURCE_QUEUE_NAME       => 'stradm.STREAMS_QUEUE',
   DESTINATION_QUEUE_NAME  => 'stradm.STREAMS_QUEUE@'||:site2,
   INCLUDE_DML             =>  FALSE,
   INCLUDE_DDL             =>  TRUE,
   SOURCE_DATABASE         =>  :site1);
END;
/

BEGIN
 DBMS_STREAMS_ADM.ADD_SCHEMA_RULES(
   SCHEMA_NAME   => 'stprocadm',
   STREAMS_TYPE  => 'capture',
   STREAMS_NAME  => 'capture_user1',
   QUEUE_NAME    => 'stradm.STREAMS_QUEUE',
   INCLUDE_DML   =>  FALSE,
   INCLUDE_DDL   =>  TRUE);
END;
/
EXEC :scn:= DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER;

connect stradm/stradm@inst2

BEGIN
     DBMS_APPLY_ADM.SET_SCHEMA_INSTANTIATION_SCN (
               'strprocadm',:site1,:scn);
END;
/
CREATE OR REPLACE procedure exec_ddl(in_any IN SYS.ANYDATA)
IS
    lcr SYS.LCR$_DDL_RECORD;
    rc PLS_INTEGER;
    ddl_text CLOB;
    raw_tag RAW(2000);
    replace_string varchar2(100) := 'create or replace procedure streams_replicate_procedures as';
    compare_str varchar2(100);
    amount  number;
    sql_txt   VARCHAR2(4000);
    sql_len   NUMBER;
BEGIN
    -- Access LCR
    rc := in_any.GETOBJECT(lcr);
    DBMS_LOB.CREATETEMPORARY(ddl_text, TRUE);
    lcr.GET_DDL_TEXT(ddl_text);
    
    -- convert the DDL to an anonymous pl/sql block
    IF lcr.GET_OBJECT_NAME = 'STREAMS_REPLICATE_PROCEDURES' AND 
       lcr.GET_OBJECT_OWNER = 'STRPROCADM' THEN
      SELECT SUBSTR(ddl_text, 1, LENGTH(replace_string)) INTO compare_str
       FROM DUAL;
      IF (compare_str = replace_string) THEN
         DBMS_LOB.WRITE(ddl_text, LENGTH('declare'), 1,  'declare');
	 amount := length(replace_string)-length('declare');
	 DBMS_LOB.ERASE(ddl_text, amount, length('declare')+1);

      END IF;
      sql_len := DBMS_LOB.GETLENGTH(ddl_text);
      DBMS_LOB.READ(ddl_text, sql_len, 1, sql_txt);
      
      EXECUTE IMMEDIATE sql_txt;

    ELSE
      -- Execute DDL LCR
       lcr.EXECUTE();
    END IF;
    
    -- Free temporary LOB space
    DBMS_LOB.FREETEMPORARY(ddl_text);
EXCEPTION WHEN OTHERS THEN
  RAISE;
END;
/
BEGIN
    DBMS_APPLY_ADM.ALTER_APPLY(
        apply_name => 'apply_from_inst1'
        ,ddl_handler => 'stradm.exec_ddl'
    );
END;
/

3.  DATABASE trigger on AFTER CREATE

CREATE OR REPLACE TRIGGER ddl_trg AFTER CREATE
  ON DATABASE
DECLARE
BEGIN

  IF (ora_dict_obj_name = 'STREAMS_REPLICATE_PROCEDURES' AND
      ora_dict_obj_type = 'PROCEDURE' AND
      ora_dict_obj_owner = 'STRPROCADM') THEN
    -- check that the procedure to be executed is definer's right,
    -- if strprocadm is untrusted, since the database trigger is
    -- created by SYSTEM.

    strprocadm.streams_procedural_replication.trigger_body;

  END IF;
end;
/

4.  Grant privilege to STRPROCADM to execute user procedure

GRANT EXECUTE to strprocadm on user1.mybulkdml;

5.  Grant privilege to USER1 to execute procedural replication 

GRANT EXECUTE on strprocadm.streams_procedural_replcation to user1;


Procedural replication example  


A worked example demonstrating procedural replication with Streams is available for download.  This zip file provides a complete solution for simulating procedural replication with Oracle Streams,  including the complete setup of Streams to replication DML for a particular schema, USER1 and procedural replication for STRPROCADM.  The example code can be executed with any release of Oracle Streams.

To prepare for running this sql script, configure two(2) Oracle databases(9iR2 or 10g).    The source database (where changes are captured) must be Enterprise Edition and enabled for ARCHIVE LOGGING.