Oracle 10g CDC
?
??? 從Oracle9i開始,Oracle引入了CDC技術(shù)來實(shí)現(xiàn)對(duì)變化數(shù)據(jù)的捕獲。在Oracle9i中CDC只支持同步的數(shù)據(jù)捕獲(synchronous change capture),源數(shù)據(jù)的變化被實(shí)時(shí)的捕獲,捕獲的過程和源數(shù)據(jù)是同一個(gè)事務(wù)。它的實(shí)現(xiàn)需要源數(shù)據(jù)支持trigger,所以這種同步的技術(shù)會(huì)給數(shù)據(jù)源帶來性能的問題。這是CDC在Oracle9i的一個(gè)缺陷(在Oracle10g中已經(jīng)改進(jìn))。
??? 在CDC中變化的數(shù)據(jù)被保存在一個(gè)變化表(change table)中,使用者通過訂閱的方式(subscribe)生成一個(gè)視圖(subscribe view)并且通過這個(gè)視圖來得到這些變化的數(shù)據(jù),一個(gè)change table可以有多個(gè)訂閱者,也就可以有多個(gè)subscribe view。每個(gè)subscribe view可以從change table提取自己所關(guān)心的不同的columns 和rows。
??? 在Oracle10g中,CDC開始支持異步方式來捕獲變化的數(shù)據(jù)。在Oracle10g中利用redo log來實(shí)現(xiàn)CDC。Redo log是一個(gè)記錄所有數(shù)據(jù)庫變化的獨(dú)立的文件。CDC的異步方式是非入侵方式(noninvasive)的實(shí)現(xiàn),對(duì)于變化數(shù)據(jù)的捕獲就不需要數(shù)據(jù)庫加入trigger了。
?
?
Oracle的redo log有兩種類型:
?
● On-line redo log
● Archive redo log
這種從redo log中讀取數(shù)據(jù)變化的方式叫做“日志挖掘”(mining the logs),這種方式使捕獲數(shù)據(jù)的操作從源數(shù)據(jù)的事務(wù)中分離出來,減輕了trigger技術(shù)給數(shù)據(jù)源帶來的性能問題(是減輕不是消除!)
?
在Oracle10g中,異步CDC有兩種實(shí)現(xiàn)方式:
?
● HOTLOG
?
這種方式利用On-line redo log。變化的數(shù)據(jù)從on-line redo log中獲取,然后通過(Oracle Streams技術(shù))把獲取的變化保存在本地的變化表中(變化表還是在數(shù)據(jù)源上)。然后還需要其他方式把這些數(shù)據(jù)移到數(shù)據(jù)倉庫中。
● AUTOLOG
?
這種方式與HOTLOG不同的是數(shù)據(jù)獲取的位置不同。AUTOLOG方式利用Log Transport Services技術(shù)在不同數(shù)據(jù)庫之間轉(zhuǎn)換數(shù)據(jù)時(shí)進(jìn)行變化數(shù)據(jù)獲取操作的。如果目標(biāo)數(shù)據(jù)庫正好是數(shù)據(jù)倉庫的話,這種方式就比HOTLOG方式減少了一次數(shù)據(jù)遷移的環(huán)節(jié)。
?
Log Transport Services技術(shù)是標(biāo)準(zhǔn)的日志遷移技術(shù),比如Oracle的Oracle Data Guard。
?
Latency of change:
?
從發(fā)生變化的數(shù)據(jù)源事務(wù)提交到利用CDC發(fā)現(xiàn)變化的數(shù)據(jù),并且把變化的數(shù)據(jù)移植到變化表之間的時(shí)間間隔。HOTLOG方式的時(shí)間滯后要比AUTOLOG方式小。
?
Oracle Streams
Oracle Streams技術(shù)可以在多個(gè)數(shù)據(jù)庫之間進(jìn)行數(shù)據(jù)備份,它通過在數(shù)據(jù)隊(duì)列里面記錄變化的數(shù)據(jù)。Oracle Streams技術(shù)也是利用數(shù)據(jù)庫的redo log來解析變化的數(shù)據(jù)的。Oracle Streams除了進(jìn)行數(shù)據(jù)備份外,還可以用來作為數(shù)據(jù)倉庫的增量抽取。
?
Oracle Streams與CDC技術(shù)比較,可以用一個(gè)形象的比喻來形容:
?
把Oracle Streams比作磚、瓦;
把CDC看作是用磚、瓦蓋成的大廈;
?
Oracle Streams可以用來實(shí)現(xiàn)CDC,它們是輔助的關(guān)系,不是真正競(jìng)爭(zhēng)的關(guān)系。
?
?
?
?
?
?
?
上面是CDC的簡(jiǎn)單介紹,下面轉(zhuǎn)一篇應(yīng)用(但是怎么看都不太防舒服)
*****************************************************************************************
Oracle 10g CDC Test
?
?
怎樣使用oracle 10G CDC(Capturing Changed Data)
1)create the user on the source DB and target DB.
sqlplus "/? as sysdba"
connect system/manager
set serveroutput on size 100000
set linesize 2000
create or replace package etl_util AUTHID CURRENT_USER
IS
? FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
? FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
? FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW)? RETURN VARCHAR2;
END etl_util;
/
show errors
?
create or replace package body etl_util
IS
-- ******************************************************************************
-- PUBLIC FUNCTION get_col_names
-- ******************************************************************************
? FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2???
??? IS
? BEGIN
??? DECLARE
????? v_col_nm VARCHAR2(100);
????? v_ret VARCHAR2(32767);
????? v_work_str VARCHAR2(32767);
?
????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
??????????? SELECT column_name FROM all_tab_columns
??????????? WHERE owner = UPPER(TRIM(p_schm))
??????????? AND table_name = UPPER(TRIM(p_tbl_nm))
??????????? ORDER BY column_name;
?
? BEGIN
? OPEN c_tab_col(p_table_name, p_schema);
? v_ret := '';
? v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
?
? LOOP
????? FETCH c_tab_col INTO v_col_nm;
????? EXIT WHEN c_tab_col%NOTFOUND;
??? IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
??????? v_ret := v_ret || ',' || p_pref ||LOWER(v_col_nm);
??? END IF;
? END LOOP;
? CLOSE c_tab_col;
? IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2);
? END IF;
? IF (v_ret IS NULL) THEN
????? raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
??????? || ' Does not exist or has no columns');
? END IF;
?
? RETURN v_ret;
? END;
? END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_col_names
-- ******************************************************************************
?
-- ******************************************************************************
-- PUBLIC FUNCTION get_cols_definition
-- ******************************************************************************
? FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2???
??? IS
? BEGIN
??? DECLARE
????? v_col_nm VARCHAR2(100);
????? v_col_def VARCHAR2(300);
????? v_ret VARCHAR2(32767);
????? v_work_str VARCHAR2(32767);
????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
??????? SELECT column_name,
??????????? RPAD(column_name,53) ||
??????? decode(data_type
??????????? ,'NUMBER','NUMBER('||TO_CHAR(data_precision)||
??????????????? decode(data_scale,0,'',','||data_scale)||')'
??????????? ,'CHAR','CHAR('||TO_CHAR(DATA_LENGTH)||')'
??????????? ,'VARCHAR2','VARCHAR2('||TO_CHAR(DATA_LENGTH)||')'
??????????? ,'DATE','DATE'
??????????? ,data_type) def
??????? from all_tab_columns
??????? where table_name = upper(trim(p_tbl_nm))
??????????? AND owner = upper(trim(p_schm))
??????? order by owner, table_name, column_name
??????? ;
? BEGIN
? OPEN c_tab_col(p_table_name, p_schema);
? v_ret := '';
? v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
?
? LOOP
????? FETCH c_tab_col INTO v_col_nm, v_col_def;
????? EXIT WHEN c_tab_col%NOTFOUND;
??? IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
??????? v_ret := v_ret || ', ' || p_pref ||LOWER(v_col_def);
??? END IF;
? END LOOP;
? CLOSE c_tab_col;
? IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2); END IF;
? IF (v_ret IS NULL) THEN
????? raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
??????? || ' Does not exist or has no columns');
? END IF;
?
? RETURN v_ret;
? exception
????????? when others
??????????? then dbms_output.put_line(SUBSTR('Error in get_cols_definition ' || sqlerrm,1,220));
? return 'error ' || sqlerrm ;
? END;
? END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_cols_definition
-- ******************************************************************************
?
-- ******************************************************************************
-- PUBLIC FUNCTION get_modified_col_names
-- ******************************************************************************
? FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW) RETURN VARCHAR2
??? IS
? BEGIN
??? DECLARE
????? v_col_nm VARCHAR2(100);
????? v_col_pos number;
????? v_ret VARCHAR2(32767);
????? v_work_str VARCHAR2(32767);
??? v_byte_nr INTEGER;
??? v_col_map VARCHAR2(32767);
??? i INTEGER; j INTEGER;k INTEGER;
??? v_val VARCHAR2(5);
????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
??????? SELECT column_name, POWER(2,MOD(column_id,8)) col_pos, 1 + floor((column_id)/8) byte_nr
??????????? FROM all_tab_cols
??????????? WHERE table_name=UPPER(p_tbl_nm)
??????????????? AND OWNER=UPPER(p_schm)
??????????? ORDER BY column_id;???????
? BEGIN
????? SELECT dump(source_colmap$,10) INTO v_col_map FROM DUAL;
????? i:=INSTR(v_col_map,':');
????? v_col_map:=','||SUBSTR(v_col_map,i + 2)||',';
--??? dbms_output.put_line('v_col_map='||v_col_map);
????? OPEN c_tab_col(table_name, schema);
????? v_ret := '';
????? LOOP
--??? dbms_output.put_line('--1');
????????? FETCH c_tab_col INTO v_col_nm, v_col_pos, v_byte_nr;
????????? EXIT WHEN c_tab_col%NOTFOUND;
--??? dbms_output.put_line('byte_nr='||to_char(v_byte_nr));
????????? i:=INSTR(v_col_map,',',1,v_byte_nr);
????????? j:=INSTR(v_col_map,',',1,v_byte_nr + 1);
????????? IF ((i=0) OR (j = 0)) THEN
????????????? EXIT;
????????? END IF;
????????? v_val := SUBSTR(v_col_map, i + 1, j - i - 1 );
????????? k := BITAND(TO_NUMBER(v_val),v_col_pos);
????????? IF (k>0) THEN
????????????? v_ret := v_ret || ',' || v_col_nm;
????????? END IF;
--??? DBMS_OUTPUT.put_line('col='||v_col_nm ||' byte='||TO_CHAR(v_byte_nr)
--????????? || ' i=' || TO_CHAR(i) || ' j='||TO_CHAR(j)
--????????? ||' val='||v_val||' col_pos='||TO_CHAR(v_col_pos)||'res='||TO_CHAR(k));
????? END LOOP;
????? CLOSE c_tab_col;
????? IF (v_ret IS NOT NULL) THEN
????????? v_ret := SUBSTR(v_ret,2);
????? END IF;
????? RETURN v_ret;
? EXCEPTION
??????? WHEN OTHERS
??????????? THEN dbms_output.put_line(SUBSTR('Error in get_modifed_col_names: ',1,220));
??????????????? dbms_output.put_line(SUBSTR(sqlerrm,1,240));
????? return '';
? END;
? END;
-- ******************************************************************************
-- END OF PUBLIC FUNCTION get_modified_col_names
-- ******************************************************************************
?
END etl_util;
/
show errors
grant execute on etl_util to public;
DROP PUBLIC SYNONYM etl_util;
CREATE PUBLIC SYNONYM etl_util FOR etl_util;
--select etl_util.get_modified_col_names(schema1 => 'SCOTT'
--,table_name => 'EMP'
--,source_colmap$ => HEXTORAW('C000')) FROM dual;
--
BEGIN
declare v_res VARCHAR2(1000);
BEGIN
??? v_res := etl_util.get_modified_col_names('test', 'philip_test', HEXTORAW('0601'));
??? dbms_output.put_line('res='||v_res);
END;
END;
/
2)create the user
connect system/manager
DROP USER cdc_publisher CASCADE;
CREATE USER cdc_publisher IDENTIFIED BY pass;
GRANT EXECUTE_CATALOG_ROLE to cdc_publisher;
GRANT SELECT_CATALOG_ROLE to cdc_publisher;
?
3)grant the priviledge
connect test/pass
GRANT SELECT on PHILIP_TEST to cdc_publisher;
GRANT SELECT on PHILIP_TEST2 to cdc_publisher;
--the PHILIP_TEST2? is under? user?? "test"
4)Create Change Tables
?
/* Login as a publisher to run this */
BEGIN
??? DECLARE work_sql VARCHAR2(32767);
??????? v_publisher_id VARCHAR2(20)? := 'cdc_publisher';
??????? v_source_schema VARCHAR2(20) := 'test';
??????? v_source_table VARCHAR2(100);
??????? v_cdc_table VARCHAR2(100);
??????? CURSOR c_tables IS
??????????? SELECT 'PHILIP_TEST' table_name FROM dual
??????????????? UNION
??????????? SELECT 'PHILIP_TEST2' table_name FROM dual
??????? ;
BEGIN
FOR tables IN c_tables LOOP
? v_source_table := tables.table_name;
? v_cdc_table := 'rep_'||v_source_table;
? work_sql := etl_util.get_cols_definition(
??????? p_schema=>v_source_schema,
??????? p_table_name =>v_source_table,
??????? p_skip_columns => NULL,
??????? p_pref => NULL);
??? DBMS_output.put_line('work_sql'||work_sql);
? DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE (
??? OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema
??? ,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'new'--only Captures the changed values from the source table
??? ,RS_ID => 'n', ROW_ID => 'n', USER_ID => 'y', TIMESTAMP => 'y'
??? ,OBJECT_ID => 'n' -- leave it as 'N' or you will have "table has no columns" error
??? ,SOURCE_COLMAP => 'n', TARGET_COLMAP => 'n', OPTIONS_STRING => null
??? ,SOURCE_TABLE => v_source_table, CHANGE_TABLE_NAME => v_cdc_table
??? ,COLUMN_TYPE_LIST => work_sql);
? DBMS_output.put_line('Change table '||v_cdc_table ||' was created successfully');
END LOOP;
?
EXCEPTION
??? WHEN OTHERS THEN
??? DBMS_output.put_line('Error start ********************************************');
??? DBMS_output.put_line('Error during change table ' || v_cdc_table || ' creation:');
??? DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
??? DBMS_output.put_line('Error end? ********************************************');
END;
END;
/
5)when we change the "test.PHILIP_TEST" and "test.PHILIP_TEST2" ,under cdc_publisher , the table "cdc_philip_test will recorde this operation and the value at once.
?
?
*****************************************************************************************
?
?
?
?
?
另附一篇比較詳細(xì)的CDC原理研究:
*****************************************************************************************
oracle學(xué)習(xí)--CDC 研究(1)
?
oracle學(xué)習(xí)--CDC 研究(2)
?
?
?