The ability to replicate the invocation or call of a pl/sql procedure to another database, rather than the actual DML changes the procedure performed, is known as procedural replication. This page describes a method of accomplishing procedural replication in a Streams environment. Procedural replication is simulated in Streams by creating a PL/SQL wrapper procedure and identifying the procedure to be run in an anonymous block. The wrapper procedure performs the user procedure to be replicated and generates a DDL statement that will execute the procedure at the target database. The DDL statement is replicated to the target database rather than the DML generated by the user procedure. This technique is most useful for performing large batch updates in Streams using stored procedures.
Some initial setup of the source and target databases is required to prepare the participating databases for procedural replication. After this initial configuration is complete, it is a simple matter of granting users privilege to perform procedural replication. After the initial setup is complete and the appropriate privileges have been granted to the user, the user runs the procedure by invoking the procedure replication wrapper, streams_procedure_replication.execute_call
Assume that the schema USER1 exists and that the procedure mybulkdml exists with the signature MYBULKDML(schemaname,tablename). Further assume that Streams replicates DML for the USER1 schema.
exec strprocadm.streams_procedural_replication.execute_call('BEGIN mybulkdml(''USER1'',''CUST''); END;');
or
declare
sql_txt varchar2(4000);
begin
sql_txt := 'begin user1.mybulkdml(''USER1'', ''CUST'');end;';
strprocadm.streams_procedural_replication.execute_call(sql_txt);
end;
/
Restrictions on Procedural Replication:
The user procedure (anonymous block) to be replicated must not perform any COMMIT.
Procedure calls with OUT variables are not supported
During the window of procedure replication, the user needs to implement a mechanism to prevent concurrent DMLs on the user tables modified by the replicated procedure. For example, Lock the tables modified by the replicated procedure.
Notes:
The replicated procedure is handled as DDL at the destination site.
The execution of the transaction at the source and destination are in the same transaction.
The package streams_procedural_replication should be a definer's rights package, since a procedure is invoked from this package by a trigger in the SYSTEM schema.
This section contains step-by-step instructions to implement procedural replication. Following this section is a worked example (sql scripts and associated logfiles) that can be used to demonstrate this capability.
At the source database, create the procedural replication administration user (STRPROCADM)
CREATE USER strprocadm IDENTIFIED BY strprocadm;
GRANT CONNECT, RESOURCE TO strprocadm;
GRANT create procedure TO strprocadm;
At the target database, create the procedural replication administration user (STRPROCADM)
CREATE USER strprocadm IDENTIFIED BY strprocadm;
GRANT CONNECT, RESOURCE TO strprocadm;
At both source and target databases, connect as STRPROCADM to do the following:
-- This table is created to store information on the user procedure to be replicated.
CREATE TABLE proc_rep_tab (anon_block VARCHAR2(4000));
-- for security reasons, this should be a definer's rights package CREATE OR REPLACE PACKAGE streams_procedural_replication AS -- body of DDL trigger created by system PROCEDURE trigger_body; -- does local and remote invokation of anonymous block PROCEDURE execute_call(anonymous_block IN VARCHAR2); END; / CREATE OR REPLACE PACKAGE BODY STREAMS_PROCEDURAL_REPLICATION as -- private procedure to store procedure in table -- only one stored procedure can be run at a time, otherwise -- and error is raised. PROCEDURE store_call(my_proc_call IN VARCHAR2) AS cnt NUMBER; BEGIN -- This proc_rep_tab table is shared by the users in the database for -- procedure replication. This workaround only allows one user at a -- time to do procedure replication. The table is locked for -- synconization propose. LOCK TABLE strprocadm.proc_rep_tab IN EXCLUSIVE MODE; SELECT count(*) into cnt FROM proc_rep_tab; IF cnt >0 THEN RAISE_APPLICATION_ERROR(-1001, 'More than one row in PROC_REP_TAB'); END IF; -- Populate the proc_rep_tab table, procedure info is retrived -- during trigger execution by procedure call: trigger_body(). INSERT INTO strprocadm.proc_rep_tab(anon_block) values (my_proc_call); COMMIT; EXCEPTION WHEN OTHERS THEN -- If errors occur, make sure to clean up proc_rep_tab DELETE from strprocadm.proc_rep_tab; COMMIT; RAISE; END; PROCEDURE trigger_body AS my_anon_block VARCHAR2(4000); BEGIN -- Before executing the user procedure, turn off capture so that -- DMLs on the local table are not captured. dbms_streams.set_tag('FF'); -- Execute the user procedure locally select anon_block into my_anon_block from strprocadm.proc_rep_tab; execute immediate my_anon_block; -- Turn on capture. dbms_streams.set_tag; EXCEPTION WHEN OTHERS THEN -- IF errors occur, make sure the capture is back on. dbms_streams.set_tag; RAISE; END; PROCEDURE execute_call(anonymous_block IN VARCHAR2) AS procedure_sql varchar2(4000); BEGIN -- Populate the table for storing proceudre info. store_call(anonymous_block); -- Create the well-known procedure to simulate procedure replication. procedure_sql := 'create or replace procedure streams_replicate_procedures as' ||' begin ' ||anonymous_block ||' end;'; execute immediate procedure_sql; -- clean-up proc_rep_tab table DELETE from strprocadm.proc_rep_tab; commit; EXCEPTION WHEN OTHERS THEN -- If errors occur, make sure to clean up proc_rep_tab DELETE from strprocadm.proc_rep_tab; commit; RAISE; END; END STREAMS_PROCEDURAL_REPLICATION; /
- At the source database, connect to STRPROCADM to grant SYSTEM execute privilege on the streams_procedural_replication package. The trigger body procedure of this package is performed by the SYSTEM schema whenever procedural replication is invoked.
.
GRANT EXECUTE ON STRPROCADM.STREAMS_PROCEDURAL_REPLICATION TO SYSTEM;
As the Streams Administrator (STRADM), add a Streams rule for the schema STRPROCADM to capture/propagation/apply processes for DDL replication only. In the example below, inst1 and site1 both refer to the source database while inst2 and site2 refer to the target database. Note that the INCLUDE_DML is set to FALSE and INCLUDE_DDL is set to TRUE. Both of these are set to non-default values. Site1 and Site2 should reflect the global_name of the individual databases. Modify the PL/SQL below to identify the correct streams_name and queue_name for your configuration
BEGIN DBMS_STREAMS_ADM.ADD_SCHEMA_RULES( SCHEMA_NAME => 'strprocadm', STREAMS_TYPE => 'APPLY', STREAMS_NAME => 'apply_from_inst1', QUEUE_NAME => 'stradm.STREAMS_QUEUE', INCLUDE_DML => FALSE, INCLUDE_DDL => TRUE, SOURCE_DATABASE => :site1); END; /
BEGIN DBMS_STREAMS_ADM.ADD_SCHEMA_PROPAGATION_RULES( SCHEMA_NAME => 'strprocadm', STREAMS_NAME => 'INST1_TO_INST2', SOURCE_QUEUE_NAME => 'stradm.STREAMS_QUEUE', DESTINATION_QUEUE_NAME => 'stradm.STREAMS_QUEUE@'||:site2, INCLUDE_DML => FALSE, INCLUDE_DDL => TRUE, SOURCE_DATABASE => :site1); END; / BEGIN DBMS_STREAMS_ADM.ADD_SCHEMA_RULES( SCHEMA_NAME => 'stprocadm', STREAMS_TYPE => 'capture', STREAMS_NAME => 'capture_user1', QUEUE_NAME => 'stradm.STREAMS_QUEUE', INCLUDE_DML => FALSE, INCLUDE_DDL => TRUE); END; /
EXEC :scn:= DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER; connect stradm/stradm@inst2 BEGIN DBMS_APPLY_ADM.SET_SCHEMA_INSTANTIATION_SCN ( 'strprocadm',:site1,:scn); END; /
CREATE OR REPLACE procedure exec_ddl(in_any IN SYS.ANYDATA) IS lcr SYS.LCR$_DDL_RECORD; rc PLS_INTEGER; ddl_text CLOB; raw_tag RAW(2000); replace_string varchar2(100) := 'create or replace procedure streams_replicate_procedures as'; compare_str varchar2(100); amount number; sql_txt VARCHAR2(4000); sql_len NUMBER; BEGIN -- Access LCR rc := in_any.GETOBJECT(lcr); DBMS_LOB.CREATETEMPORARY(ddl_text, TRUE); lcr.GET_DDL_TEXT(ddl_text); -- convert the DDL to an anonymous pl/sql block IF lcr.GET_OBJECT_NAME = 'STREAMS_REPLICATE_PROCEDURES' AND lcr.GET_OBJECT_OWNER = 'STRPROCADM' THEN SELECT SUBSTR(ddl_text, 1, LENGTH(replace_string)) INTO compare_str FROM DUAL; IF (compare_str = replace_string) THEN DBMS_LOB.WRITE(ddl_text, LENGTH('declare'), 1, 'declare'); amount := length(replace_string)-length('declare'); DBMS_LOB.ERASE(ddl_text, amount, length('declare')+1); END IF; sql_len := DBMS_LOB.GETLENGTH(ddl_text); DBMS_LOB.READ(ddl_text, sql_len, 1, sql_txt); EXECUTE IMMEDIATE sql_txt; ELSE -- Execute DDL LCR lcr.EXECUTE(); END IF; -- Free temporary LOB space DBMS_LOB.FREETEMPORARY(ddl_text); EXCEPTION WHEN OTHERS THEN RAISE; END; /
BEGIN DBMS_APPLY_ADM.ALTER_APPLY( apply_name => 'apply_from_inst1' ,ddl_handler => 'stradm.exec_ddl' ); END; /
CREATE OR REPLACE TRIGGER ddl_trg AFTER CREATE ON DATABASE DECLARE BEGIN IF (ora_dict_obj_name = 'STREAMS_REPLICATE_PROCEDURES' AND ora_dict_obj_type = 'PROCEDURE' AND ora_dict_obj_owner = 'STRPROCADM') THEN -- check that the procedure to be executed is definer's right, -- if strprocadm is untrusted, since the database trigger is -- created by SYSTEM. strprocadm.streams_procedural_replication.trigger_body; END IF; end; /
GRANT EXECUTE to strprocadm on user1.mybulkdml;
GRANT EXECUTE on strprocadm.streams_procedural_replcation to user1;
A worked example demonstrating
procedural replication with Streams is available for download. This zip
file provides a complete solution for
simulating procedural replication with Oracle Streams, including the
complete setup of Streams to replication DML for a particular schema, USER1 and
procedural replication for STRPROCADM. The example code can be
executed with any release of Oracle Streams.
To prepare for running this sql script, configure two(2) Oracle databases(9iR2 or 10g). The source database (where changes are captured) must be Enterprise Edition and enabled for ARCHIVE LOGGING.
strproc.sql - set up procedural replication
(logfile)
stradm.sql - set up Streams (logfile)
usr.sql - set up USER1 schema/objects and test user's call.
(logfile)