Applies to:
Oracle Server – Enterprise Edition – Version: 10.1.0.4 to 10.2.0.2
Information in this document applies to any platform.
Purpose
This is a complete example of configuring Streams and extending it for auto correction.
To prepare for running this sql script, configure two(2) Oracle databases(9iR2 or 10g). The source database (where changes are captured) must be Enterprise Edition and enabled for ARCHIVE LOGGING.
Software Requirements/Prerequisites
Applicable with SQL, SQL*Plus and iSQL*Plus Oracle10g
Configuring the Sample Code
- Modify the init.ora or spfiles of both databases to configure the size of the streams pool(10gR1 only) and enable job queue processing.
- The job_queue_processes parameter should be set to a positive non-zero integer. Typically setting job_queue_processes=4 is sufficient.
- Create the streams administrator user STRADM.
- Grant DBA privilege to the STRADM user.
- Create tnsnames.ora entries at the source database for dbs1.net and dbs2.net, for the source and target databases respectively. Alternatively, modify the script so that dbs1.net and dbs2.net reflect the correct service names for each database.
- The script assumes that the HR sample schema exists.
Be sure to review the script before running it. There are comments imbedded in the script that describe its actions.
Note: This script modifies the GLOBAL_NAME of each database to match dbs1.net and dbs2.net. If
this is not desired, please modify the script accordingly.
Running the Sample Code
see below
Caution
Sample Code
SQL SCRIPT
===========
Rem
Rem Copyright (c) 2002, 2004, Oracle. All rights reserved.
Rem
Rem NAME
Rem Best Practices Examples - Auto Correction
Rem
Rem DESCRIPTION
Rem Auto-correction with control at source
Rem control table is created and replicated to all sites
Rem Setting auto_correct to 'Y' handles in case of error during apply
Rem
Rem NOTES
Rem We recommend that PKs are not modified when autocorrection is
Rem being used.
Rem
SET ECHO ON
SET FEEDBACK 1
SET NUMWIDTH 10
SET LINESIZE 80
SET TRIMSPOOL ON
SET TAB OFF
SET PAGESIZE 100
variable site1 varchar2(80);
variable site2 varchar2(80);
variable scn number;
set echo on
-- Set database name and Create database links
tvconnect sys/change_on_install@dbs1.net as sysdba
ALTER DATABASE RENAME GLOBAL_NAME TO DBS1.NET;
connect sys/change_on_install@dbs2.net as sysdba
ALTER DATABASE RENAME GLOBAL_NAME TO DBS2.NET;
connect sys/change_on_install@dbs1.net as sysdba
create public database link dbs2.net using 'dbs2.net';
connect sys/change_on_install@dbs2.net as sysdba
create public database link dbs1.net using 'dbs1.net';
connect stradm/stradm@dbs1.net;
create database link dbs2.net connect to stradm identified by stradm
using 'dbs2.net';
connect stradm/stradm@dbs2.net;
create database link dbs1.net connect to stradm identified by stradm
using 'dbs1.net';
connect hr/hr@dbs1.net;
create database link dbs2.net connect to hr identified by hr
using 'dbs2.net';
connect hr/hr@dbs2.net;
create database link dbs1.net connect to hr identified by hr
using 'dbs1.net';
connect stradm/stradm@dbs1.net;
execute select global_name into :site1 from global_name;
execute select global_name into :site2 from global_name@dbs2.net;
print site1;
print site2;
------------------------------------------------------
-- - Create Streams Queues
-- - Verify if they are secure Queues.
------------------------------------------------------
connect stradm/stradm@dbs2.net;
exec dbms_streams_adm.set_up_queue( ) ;
select owner , queue_table , secure from dba_queue_tables
where queue_table = 'STREAMS_QUEUE_TABLE' order by owner , queue_table;
connect stradm/stradm@dbs1.net;
exec dbms_streams_adm.set_up_queue( ) ;
select owner , queue_table , secure from dba_queue_tables
where queue_table = 'STREAMS_QUEUE_TABLE' order by owner , queue_table;
------------------------------------------------------
-- Create propagation rules to dbs2.net
------------------------------------------------------
connect stradm/stradm@dbs1.net;
begin
dbms_streams_adm.add_schema_propagation_rules(
schema_name => 'hr',
streams_name => 'dbs1net_to_dbs2net',
source_queue_name => 'stradm.streams_queue',
destination_queue_name => 'stradm.streams_queue@'||:site2,
include_dml => true,
include_ddl => true,
source_database => :site1);
end;
/
select PROPAGATION_NAME , SOURCE_QUEUE_NAME , DESTINATION_QUEUE_NAME
from all_propagation order by propagation_name ;
connect hr/hr@dbs1.net
-------------------------------------------------------------------------
-- Create control table for control at source
-- auto_correct is set to 'Y' if error needs to
-- be handled
-------------------------------------------------------------------------
CREATE TABLE control_table (sname varchar2(30),
oname varchar2(30) ,
auto_correct varchar2(2) );
-- Add supplemental logging
alter table control_table add SUPPLEMENTAL LOG GROUP
control_table_log_group (sname,oname,auto_correct);
insert into regions values ( 1002,'YL');
insert into regions values ( 1003,'YM');
commit;
connect hr/hr@dbs2.net
-------------------------------------------------------------------------
-- Create control table for control at source at target database
-- auto_correct is set to 'Y' if error needs to
-- be handled
-------------------------------------------------------------------------
CREATE TABLE control_table (sname varchar2(30),
oname varchar2(30) ,
auto_correct varchar2(2) );
alter table control_table add SUPPLEMENTAL LOG GROUP
control_table_log_group (sname,oname,auto_correct);
insert into regions values ( 1002,'YL');
insert into regions values ( 1003,'YM');
commit;
------------------------------------------------------
-- - Create Capture Process at dbs1.net
-- - and set table inst scn for hr.regions
------------------------------------------------------
connect stradm/stradm@dbs1.net
begin
dbms_streams_adm.add_table_rules(
table_name => 'hr.regions',
streams_type => 'capture',
streams_name => 'capture_hr',
queue_name => 'stradm.streams_queue',
include_dml => true,
include_ddl => true);
end;
/
begin
dbms_streams_adm.add_table_rules(
table_name => 'hr.control_table',
streams_type => 'capture',
streams_name => 'capture_hr',
queue_name => 'stradm.streams_queue',
include_dml => true,
include_ddl => true);
end;
/
select capture_name, queue_name, queue_owner, status
from dba_capture@dbs1 order by 1,2;
connect stradm/stradm@dbs1.net
exec :scn:= dbms_flashback.get_system_change_number;
------------------------------------------------------
-- - Create apply @ dbs2.net
------------------------------------------------------
connect stradm/stradm@dbs2.net
begin
dbms_streams_adm.add_table_rules(
table_name => 'hr.regions',
streams_type => 'apply',
streams_name => 'apply_from_dbs1net',
queue_name => 'stradm.streams_queue',
include_dml => true,
include_ddl => true,
source_database => :site1);
end;
/
begin
dbms_streams_adm.add_table_rules(
table_name => 'hr.control_table',
streams_type => 'apply',
streams_name => 'apply_from_dbs1net',
queue_name => 'stradm.streams_queue',
include_dml => true,
include_ddl => true,
source_database => :site1);
end;
/
-- Set table instantiation
begin
dbms_apply_adm.set_table_instantiation_scn ('hr.regions',:site1,:scn);
dbms_apply_adm.set_table_instantiation_scn ('hr.control_table',:site1,:scn);
end;
/
------------------------------------------------------
-- - Start apply at dbs2.net and capture at dbs1.net
------------------------------------------------------
connect stradm/stradm@dbs2.net
begin
dbms_apply_adm.set_parameter(
apply_name => 'apply_from_dbs1net',
parameter => 'disable_on_error',
value => 'N');
end;
/
begin
dbms_apply_adm.start_apply(
apply_name => 'apply_from_dbs1net');
end;
/
connect stradm/stradm@dbs1.net
begin
dbms_capture_adm.start_capture(
capture_name => 'capture_hr');
end;
/
connect stradm/stradm@dbs2.net
create sequence reg_exception_s start with 9000;
-------------------------------------------------------------------------
-- Create error handler package. When an error is raised
-- and control is set to 'Y' at source, the error is handled
-------------------------------------------------------------------------
CREATE OR REPLACE PACKAGE err_audpkg
AS
type emsg_array IS table of VARCHAR2(2000) index by BINARY_INTEGER;
PROCEDURE error_hand
(
lcr_anydata in sys.anydata ,
err_arr_len in number ,
err_num in dbms_utility.number_array,
err_msg in emsg_array
) ;
END err_audpkg ;
/
CREATE OR REPLACE PACKAGE body err_audpkg AS
PROCEDURE error_hand (
lcr_anydata in sys.anydata ,
err_arr_len in number ,
err_num in dbms_utility.number_array,
err_msg in emsg_array )
IS
lcr sys.lcr$_row_record;
ret pls_integer;
vc varchar2(25) ;
ov2 sys.lcr$_row_list;
auto_correct_mode varchar2(20);
cmd_type varchar2(200);
r_id number;
cnt number;
tmp sys.anydata;
BEGIN
-- Check if auto-correct is Enabled.
select upper(auto_correct) into auto_correct_mode
from hr.control_table where oname = 'REGIONS' and sname = 'HR';
-- try to access/manipulate the lcr
ret := lcr_anydata.getObject(lcr);
-- Get command type
cmd_type := lcr.get_command_type();
-- If command_type is INSERT or UPDATE then delete the value
-- at destination and insert the new value
IF cmd_type = 'INSERT' then
ov2 := lcr.get_values ( 'NEW' ) ;
FOR i in 1 .. ov2.COUNT
LOOP
IF lcr.get_object_name() = 'REGIONS' and auto_correct_mode = 'Y' THEN
IF ov2(i).column_name = 'REGION_ID' THEN
ret := ov2(i).DATA.GetNumber(r_id) ;
-- Delete the value at destination
delete from hr.regions where region_id = r_id;
END IF;
ELSIF lcr.get_object_name() = 'REGIONS' and auto_correct_mode = 'N' THEN
-- Fix in different way, Change the value using sequence,
-- append something to region_name
IF ov2(i).column_name = 'REGION_ID' THEN
SELECT stradm.reg_exception_s.nextval INTO r_id FROM DUAL;
ov2(i).data := Sys.AnyData.ConvertNumber( r_id ) ;
ELSIF ov2(i).COLUMN_NAME = 'REGION_NAME' THEN
ret := ov2(i).DATA.GetVarchar2(vc) ;
vc := vc || '_A'||r_id;
ov2(i).DATA := Sys.AnyData.ConvertVarchar2( vc ) ;
END IF;
END IF;
END LOOP;
-- set NEW values in the LCR.
lcr.set_values ( value_type => 'NEW' , value_list => ov2 );
-- Execute the LCR
lcr.execute ( true );
ELSIF cmd_type = 'UPDATE' then
IF lcr.get_object_name() = 'REGIONS' and auto_correct_mode = 'Y' THEN
-- A primary key is changed
IF ( lcr.get_value ( 'NEW','REGION_ID' ) is not NULL ) THEN
-- Delete the value with OLD value of region_id
tmp := lcr.get_value ( 'OLD','REGION_ID' );
ret := tmp.getnumber ( r_id );
-- Delete the old value at destination
delete from hr.regions where region_id = r_id;
-- Insert the new values with new value of region_id, the primary key
IF ( lcr.get_value ( 'NEW','REGION_NAME' ) is not NULL ) THEN
-- if region_name is also updated, use the value
lcr.delete_column('REGION_ID','OLD');
lcr.delete_column('REGION_NAME','OLD');
-- Convert Update to Insert
lcr.set_command_type ( 'INSERT');
-- Execute the LCR
lcr.execute ( true );
ELSE
tmp := lcr.get_value ( 'OLD','REGION_NAME');
ret := tmp.getvarchar2(vc);
lcr.delete_column('REGION_ID','OLD');
lcr.delete_column('REGION_NAME','OLD');
-- use old value of region as new region_name
lcr.add_column ('NEW','REGION_NAME',sys.anydata.convertvarchar2(vc) );
-- Convert Update to Insert
lcr.set_command_type ( 'INSERT');
-- Execute the LCR
lcr.execute ( true );
END IF;
ELSIF ( lcr.get_value ( 'NEW','REGION_ID' ) is null ) THEN
-- Delete the value with OLD value of region_id
tmp := lcr.get_value ( 'OLD','REGION_ID' );
ret := tmp.getnumber ( r_id );
-- Delete the old value at destination
delete from hr.regions where region_id = r_id;
-- Insert with old value of region_id and new value of region_name
lcr.execute ( true );
END IF;
ELSIF lcr.get_object_name() = 'REGIONS' and auto_correct_mode = 'N' THEN
-- Fix in different way, Change the value using sequence,
-- append something to region_name
IF ( lcr.get_value ( 'OLD','REGION_ID' ) is not NULL ) THEN
SELECT stradm.reg_exception_s.nextval INTO r_id FROM DUAL;
lcr.set_value('NEW','REGION_ID',sys.anydata.convertnumber(r_id));
tmp := lcr.get_value ( 'NEW','REGION_NAME' );
ret := tmp.getvarchar2(vc);
vc := vc || '_A'||r_id;
lcr.set_value('NEW','REGION_NAME',sys.anydata.convertvarchar2(vc) );
lcr.execute ( true );
END IF;
END IF;
-- if delete is failing because of some foreign key constraint, handle this
-- if auto_correct is TRUE
ELSIF cmd_type = 'DELETE' and auto_correct_mode = 'Y' THEN
-- Delete the row referencing region_id and delete from regions
IF lcr.get_object_name() = 'REGIONS' THEN
IF ( lcr.get_value ( 'OLD','REGION_ID' ) is not null ) THEN
tmp := lcr.get_value ( 'OLD','REGION_ID' );
ret := tmp.getnumber (r_id );
-- Delete the referencing column first
delete from hr.countries where region_id = r_id;
-- Execute the LCR
lcr.execute ( true );
END IF;
END IF;
END IF;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
tmp := lcr.get_value ( 'NEW','REGION_ID' );
ret := tmp.getnumber ( r_id );
-- delete using new.region_id
delete from hr.regions where region_id = r_id;
-- Execute the LCR
lcr.execute ( true );
END error_hand;
END err_audpkg;
/
show errors;
-- Error handler for regions
begin
dbms_apply_adm.set_dml_handler
( 'hr.regions','TABLE','DEFAULT',TRUE,'STRADM.ERR_AUDPKG.ERROR_HAND' );
end ;
/
-- Verify handler is set
select object_owner , object_name , user_procedure
from dba_apply_dml_handlers order by object_owner , object_name ;
Sample Code Output
---------------------------------------------------------------------
-- Case 1 : Auto-correction for a hr.regions
-- Auto_correct is set to TRUE. The value is replicated and
-- the value is checked inside an error handler and error is
-- handled. Supplemental logging is enabled in hr.regions
---------------------------------------------------------------------
connect hr/hr@dbs1.net;
alter table regions add SUPPLEMENTAL LOG GROUP
regions_log_group (region_id,region_name);
-- Disable primary key in regions
-- This is done only for demonstration purpose
exec dbms_streams.set_tag ( 'FF' );
alter table countries drop constraint countr_reg_fk cascade drop index;
alter table regions drop primary key cascade drop index;
exec dbms_streams.set_tag ;
connect hr/hr@dbs1.net
-- Set control table to TRUE, the error is handled for HR.REGIONS.
insert into control_table values('HR','REGIONS','Y');
commit;
insert into hr.regions values(1001,'North');
commit;
-- induce primary key violation
insert into hr.regions values(1001,'South');
commit;
connect system/manager@dbs1.net
set serveroutput on
-- sleep to allow for replication this time can be adjusted (default=5min)
dbms_lock.sleep(300);
-- Primary key is handled. region_name should be 'South'
connect hr/hr@dbs2.net
select * from regions
where region_id = 1001
order by region_id, region_name;
connect stradm/stradm@dbs2.net
-- no error message expected
select error_message from dba_apply_error;
-------------------------------------------------------------------------
-- Auto_correct is set to FALSE. The value is replicated and
-- the value is checked inside an error handler
-- Error is handled and a new value is inserted insted of failing value
-- Disable supplemental logging for the hr.regions
-------------------------------------------------------------------------
connect hr/hr@dbs1.net;
alter table regions drop SUPPLEMENTAL LOG GROUP regions_log_group;
delete from control_table;
insert into control_table values('HR','REGIONS','N');
commit;
-- induce error again
insert into hr.regions values(1001,'North_West');
commit;
connect system/manager@dbs1.net
set serveroutput on
----wait for replication - time can be adjusted (default=5min)
exec dbms_lock.sleep(300);
connect hr/hr@dbs2.net
-- one more row is inserted
select * from regions
where region_id = 1001 or region_id >= 9000
order by region_id, region_name;
connect stradm/stradm@dbs2.net
-- No errors reported
select error_message from dba_apply_error;
------------------------------------------------------
-- Test for update
------------------------------------------------------
connect hr/hr@dbs1.net;
delete from control_table;
-- Set auto correct to true
insert into control_table values('HR','REGIONS','Y');
commit;
-- Enable Supplemental logging for hr.regions;
alter table regions add SUPPLEMENTAL LOG GROUP
regions_log_group (region_id,region_name);
-- Update a value in regions
update regions set region_id = 1001,region_name='Central' where
region_id = 1002;
commit;
connect system/manager@dbs1.net
set serveroutput on
-- sleep for 5 min
exec dbms_lock.sleep(300);
connect hr/hr@dbs2.net
select * from regions
where region_id = 1001 or region_id = 1002 or region_id >= 9000
order by region_id, region_name;
connect stradm/stradm@dbs2.net
-- no error message expected
select error_message from dba_apply_error;
------------------------------------------------------
-- - Test for Delete. Violate foreign key constraint
-- - After bug fix 2271626, set an apply parameter.
------------------------------------------------------
connect stradm/stradm@dbs2.net
begin
dbms_apply_adm.set_parameter(
apply_name => 'apply_from_dbs1net',
parameter => '_restrict_all_ref_cons',
value => 'N');
end;
/
connect hr/hr@dbs2.net
insert into countries values ( 'N1','N1land',1003 );
commit;
-------------------------------------------------------------------------
-- Delete region=1003 @ dbs2.net. Will raise error at dbs2.net
-- Since this is referenced by countries. Error is handled by the
-- error handler and auto-correct is set to true
-------------------------------------------------------------------------
connect hr/hr@dbs1.net
delete from regions where region_id = 1003;
commit;
connect system/manager@dbs1.net
set serveroutput on
-- sleep for 5 min
exec dbms_lock.sleep(300);
-- rows are deleted;
connect hr/hr@dbs2.net
select * from regions
where region_id = 1003;
select * from countries
where region_id = 1003;
connect stradm/stradm@dbs2.net
select error_message from dba_apply_error;
© 2009 - 2010, www.oracledatabase12g.com. 版权所有.文章允许转载,但必须以链接方式注明源地址,否则追究法律责任.
相关文章 | Related posts:
- Troubleshooting Streams Apply Error ORA-1403, ORA-26787 or ORA-26786
- Streams Conflict Resolution
- Streams Message Tracking in 11g
- Scripts for Oracle Streams Performance Tuning Best Practice: Oracle 10g Release 10.2
- 11g R1 Streams New Features
- 10.2 Best Practices For Streams in RAC environment
- Compare and Converge in an Oracle Streams Replication Environment
- How to setup Streams replication using DBMS_STREAMS_ADM.MAINTAIN_* set of procedures
- Script to Prevent Excessive Spill of Message From the Streams Buffer Queue To Disk
- Streams Specific Patches




最新评论