Script to Prevent Excessive Spill of Message From the Streams Buffer Queue To Disk

作者: Maclean Liu , post on March 30th, 2009 , English Version
【本站文章除注明转载外,均为本站原创编译】
转载请注明:文章转载自: Oracle Clinic – Maclean Liu的个人技术博客 [http://www.oracledatabase12g.com/]
本文标题: Script to Prevent Excessive Spill of Message From the Streams Buffer Queue To Disk
本文永久地址: http://www.oracledatabase12g.com/archives/script-to-prevent-excessive-spill-of-message-from-the-streams-buffer-queue-to-disk.html
PURPOSE
-------
To provide a script which would prevent excessive spill of messages from
streams buffer queue to the Disk.

It works by stopping capture process when the number of outstanding messages
are above a threshold and then restarting capture when the messages come
within the threshold.

SCOPE & APPLICATION
-------------------
This script needs to be run when the rate at which CAPTURE process is enqueing
messages is high and the subscribers are very slow. 

As a workaround the scripts given below are to be run on the capture site to
prevent the capture site from spilling over. Please read the LIMITATIONS
section of this script before running it.

Execute these scripts as SYS user.

---------------------------- Begin README.sql ----------------------------------

REM
REM  $Id: README.sql,v 1.3 2003/06/12 18:42:27 narora Exp $
REM
REM  This file README.sql along with perms.sql and flowc.sql implements
REM  flow control for 9.2 STREAMS.
REM
REM  LIMITATIONS:
REM
REM  1) This script assumes that the database has exactly one capture.
REM
REM  2) The script requires that each queue in the database has only one
REM     enqueuer (either capture or propagation from one source). Otherwise
REM     this script could cause a deadlock by stopping capture and never
REM     starting it up.
REM     For example, if a site is capturing messages and receiving messages
REM     from two other sites it would need to have 3 queues.
REM
REM  3) When submitting the flowc_run procedure to dbms_job please be sure
REM     to set the max_messages parameter larger than the size of your
REM     largest transaction. Otherwise, this script could cause a
REM     deadlock.
REM
REM  DESCRIPTION:
REM
REM  This script needs to be run on each site where a capture is created. It
REM  works by stopping capture when the number of outstanding messages are
REM  above a threshold and then restarting capture when the messages come
REM  within the threshold.
REM
REM  The following file illustrates with an example how to setup the
REM  flow control. If you run this file as SYS it'll create a flow control
REM  user named flowc with password flowc and will create the tables and
REM  packages needed for flow control.
REM
REM  EXAMPLE:
REM
REM  Step 1)
REM
REM  the flow control user must be created first

     create user flowc identified by flowc;

REM  Step 2)
REM
REM  the user must be given basic privileges

     grant connect, resource, create procedure to flowc;

REM  Step 3)
REM
REM  grant some STREAMS privileges to the flow control user

     @perms flowc

REM  Step 4)
REM
REM  create the flow control tables and procedures

     connect flowc/flowc
     @flowc

REM  Step 5)
REM
REM  submit the flow control job
REM
REM  [To modify the message threshold for flow control and to change how
REM   often the script will run change the job submission below]
REM

     variable jobno number
     execute dbms_job.submit(:jobno, 'begin flowc_run; end;', sysdate, 'sysdate+1/1440');
     commit;

REM  ======================================================================
REM  Monitoring:
REM  ======================================================================

REM  The tables flowc_state and flowc_trace explain what the flow control
REM  job is doing
REM

select * from flowc.flowc_state;
select * from flowc.flowc_trace;

---------------------------- End README.sql ----------------------------------

---------------------------- Begin perms.sql ----------------------------------

REM
REM $Id: perms.sql,v 1.3 2003/05/20 23:09:52 narora Exp $
REM
REM execute this script as SYS
REM pass the name of the user to grant the privileges as the first argument
REM

REM views needed to check the state of the system
grant select on dba_capture to &1;
grant select on dba_queues to &1;
create or replace view v_$bufqm as select * from x$bufqm;
grant select on v_$bufqm to &1;
grant select on system.logmnr_restart_ckpt$ to &1;

REM packages to modify the state of the system
grant execute on dbms_capture_adm to &1;
grant execute on dbms_job to &1;
grant execute on dbms_lock to &1;

---------------------------- End perms.sql ----------------------------------

---------------------------- Begin flowc.sql ----------------------------------

REM
REM $Id: flowc.sql,v 1.5 2003/05/20 23:09:29 narora Exp $
REM
REM This version only supports one capture.
REM Run this script on the capture site to prevent the capture site from
REM spilling over.
REM This version doesn't check for spillover downstream.
REM

drop table flowc_trace;
create table flowc_trace
(
  time         date,
  capture_name varchar2(30),
  action       varchar2(30),
  reason       varchar2(100)
);

drop table flowc_state;
create table flowc_state
(
  time         date,
  capture_name varchar2(30) unique,
  state        varchar2(30)
);

create or replace procedure flowc_run
(
  max_messages number default 1000,
--  max_sga_pct  number default 9,
--  max_spill    number default 0,
  tracing      boolean default TRUE
)
is
  capture_name     varchar2(30);
  status           varchar2(30);
  lastaction       varchar2(30);
  queue_owner      varchar2(30);
  queue_name       varchar2(30);
  queue_table      varchar2(30);
  qid              number;
  nmsg             number;
  nckpt            number;
  ckptscn          number;
begin
  -- get the capture name assuming that there is exactly one capture
  select capture_name, status into capture_name, status from dba_capture;

  -- get the last action of the flow control
  begin
    select state into lastaction from flowc_state
    where capture_name = flowc_run.capture_name;
  exception when no_data_found then
    if (status = 'ENABLED') then
      insert into flowc_state(time, capture_name, state) values
      (sysdate, capture_name, 'EXTERNALLY STARTED');
      commit;
      lastaction := 'EXTERNALLY STARTED';
    else
      insert into flowc_state(time, capture_name, state) values
      (sysdate, capture_name, 'EXTERNALLY STOPPED');
      commit;
      lastaction := 'EXTERNALLY STOPPED';
    end if;
  end;

  -- check if some external entity has disabled or aborted capture
  if ((((status = 'DISABLED') and (lastaction <> 'STOPPED')) OR
       (status = 'ABORTED')) and (lastaction <> 'EXTERNALLY STOPPED')) then
    delete from flowc_state where capture_name = flowc_run.capture_name;
    insert into flowc_state(time, capture_name, state) values
    (sysdate, capture_name, 'EXTERNALLY STOPPED');
    commit;
    lastaction := 'EXTERNALLY STOPPED';
  end if;

  -- check if some external entity has enabled capture
  if ((status = 'ENABLED') and (lastaction <> 'STARTED') and
      (lastaction <> 'EXTERNALLY STARTED')) then
    delete from flowc_state where capture_name = flowc_run.capture_name;
    insert into flowc_state(time, capture_name, state) values
    (sysdate, capture_name, 'EXTERNALLY STARTED');
    commit;
    lastaction := 'EXTERNALLY STARTED';
  end if;

  -- there is nothing to be done if capture has been stopped by an
  -- external entity
  if (lastaction = 'EXTERNALLY STOPPED') then
    return;
  end if;

  -- at this point the valid combinations of state and lastaction are
  --
  -- state          lastaction
  -- -----          ----------
  -- ENABLED        STARTED
  -- ENABLED        EXTERNALLY STARTED
  -- DISABLED       STOPPED
  --

  -- get the queue owner and name for this capture
  select queue_name, queue_owner into queue_name, queue_owner
  from dba_capture where capture_name = flowc_run.capture_name;

  -- get the qid and queue_table for this queue
  select queue_table, qid into queue_table, qid from dba_queues
  where owner = flowc_run.queue_owner and name = flowc_run.queue_name;

  -- get the number of outstanding messages for this capture
  select max(bufqm_nmsg) into nmsg from sys.v_$bufqm
  where bufqm_qid=flowc_run.qid;

  -- have we exceeded max messages and should therefore stop capture?
  if ((nmsg > max_messages) and (status = 'ENABLED'))
  then
    if (tracing)
    then
    insert into flowc_trace(time, capture_name, action, reason) values
      (sysdate, capture_name, 'PREPARING TO STOP',
      'max outstanding messages exceeded: ' || nmsg ||' messages outstanding');
    select max(ckpt_scn) into ckptscn from system.logmnr_restart_ckpt$
    where valid=1;
    select count(*) into nckpt from system.logmnr_restart_ckpt$ where valid=1;
    insert into flowc_trace(time, capture_name, action, reason) values
       (sysdate, capture_name, 'PRE CHECKPOINT',
       'Prior '||nckpt||' checkpoints upto SCN '||ckptscn);
    commit;
    end if;
    -- before stopping capture we must make sure that a checkpoint gets
    -- taken
    dbms_capture_adm.set_parameter(capture_name, '_CHECKPOINT_FORCE', 'Y');
    -- sleep 5 seconds
    dbms_lock.sleep(5);
    if (tracing)
    then
    select max(ckpt_scn) into ckptscn from system.logmnr_restart_ckpt$
    where valid=1;
    select count(*) into nckpt from system.logmnr_restart_ckpt$ where valid=1;
    insert into flowc_trace(time, capture_name, action, reason) values
       (sysdate, capture_name, 'POST CHECKPOINT',
       'Prior '||nckpt||' checkpoints upto SCN '||ckptscn);
    commit;
    end if;
    -- update the state in the flowc_state table
    delete from flowc_state where capture_name = flowc_run.capture_name;
    insert into flowc_state(time, capture_name, state) values
    (sysdate, capture_name, 'STOPPED');
    commit;
    dbms_capture_adm.stop_capture(capture_name);
    if (tracing)
    then
      insert into flowc_trace(time, capture_name, action) values
      (sysdate, capture_name, 'STOPPED');
    commit;
    end if;
  elsif ((nmsg <= max_messages) and (status = 'DISABLED')) then
    if (tracing)
    then
      insert into flowc_trace(time, capture_name, action, reason) values
      (sysdate, capture_name, 'STARTED',
      'max outstanding messages within limit');
    end if;
    delete from flowc_state where capture_name = flowc_run.capture_name;
    insert into flowc_state(time, capture_name, state) values
    (sysdate, capture_name, 'STARTED');
    commit;
    dbms_capture_adm.start_capture(capture_name);
  end if;

end;
/

show errors;

-- schedule the job for running once a minute
-- variable jobno number
-- execute dbms_job.submit(:jobno, 'begin flowc_run; end;', sysdate+1/1440, 'sysdate+1/1440');
-- commit;

---------------------------- End flowc.sql ----------------------------------

© 2009 – 2010, www.oracledatabase12g.com. 版权所有.文章允许转载,但必须以链接方式注明源地址,否则追究法律责任.

相关文章 | Related posts:

  1. Streams Message Tracking in 11g
  2. Streams Combined Capture and Apply in 11g
  3. Oracle Streams Performance Advisor
  4. Scripts for Oracle Streams Performance Tuning Best Practice: Oracle 10g Release 10.2
  5. Split and Merge of a Streams Destination
  6. Troubleshooting Streams Capture when status is Paused For Flow Control
  7. How To Troubleshoot the Streams Apply Process
  8. 9i Best Practices For Streams RAC Setup
  9. 11g R1 Streams New Features
  10. Streams Specific Patches

Leave a Reply

  

  

  

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>