Skip to content

Oracle-11g CDC 集成

安装Kafka Connect

你可以使用docker-compose来启动服务,docker-compose.yml的内容如下:

yaml
version: '2'
services:
    kafka-connect:
        image: nexus3.tineco.com/kafka-connect:2.8.1
        ports:
            - '8083:8083'
        environment:
            - 'BOOTSTRAP_SERVERS=10.108.5.41:9092'
    kafka-connect-ui:
        image: nexus3.tineco.com/kafka-connect-ui:2.8.1
        ports:
            - '8000:8000'
        environment:
            - CONNECT_URL=http://10.107.8.21:8083

注意: CONNECT_URL需要设置为本机ip对应的http地址。

Oracle数据库需要的配置

需要打开数据库的recovery日志功能,参考如下:

sql
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
archive log list

新建同步用户

需要新建同步账号debezium,脚本参考如下:

sql
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

CREATE TABLESPACE TEST0 DATAFILE '/opt/oracle/oradata/ORCLCDB/test0.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

CREATE USER debezium IDENTIFIED BY 123456
    DEFAULT TABLESPACE TEST0
    QUOTA UNLIMITED ON TEST0

  GRANT CREATE SESSION TO debezium ;
  GRANT SET CONTAINER TO debezium ;
  GRANT SELECT ON V_$DATABASE to debezium ;
  GRANT FLASHBACK ANY TABLE TO debezium ;
  GRANT SELECT ANY TABLE TO debezium ;
  GRANT SELECT_CATALOG_ROLE TO debezium ;
  GRANT EXECUTE_CATALOG_ROLE TO debezium ;
  GRANT SELECT ANY TRANSACTION TO debezium ;
  --GRANT LOGMINING TO debezium ;

  GRANT CREATE TABLE TO debezium ;
  GRANT LOCK ANY TABLE TO debezium ;
  GRANT ALTER ANY TABLE TO debezium ;
  GRANT CREATE SEQUENCE TO debezium ;

  GRANT EXECUTE ON DBMS_LOGMNR TO debezium ;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO debezium ;

  GRANT SELECT ON V_$LOG TO debezium ;
  GRANT SELECT ON V_$LOG_HISTORY TO debezium ;
  GRANT SELECT ON V_$LOGMNR_LOGS TO debezium ;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO debezium ;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO debezium ;
  GRANT SELECT ON V_$LOGFILE TO debezium ;
  GRANT SELECT ON V_$ARCHIVED_LOG TO debezium ;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO debezium ;

配置需要采集的表

sql
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE 【库】.【表】 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

配置Kafka Connector

修改对应的配置,并且把需要同步的表配置到table.include.list下面

properties
connector.class=io.debezium.connector.oracle.OracleConnector
database.user=debezium
database.dbname=DEBEZIUM
tasks.max=1
database.history.kafka.bootstrap.servers=10.108.5.41:9092
database.history.kafka.topic=schema-changes.inventory
database.url=jdbc:oracle:thin:@10.107.8.21:1521:xe
database.server.name=oracle-service
database.hostname=10.107.8.21
database.password=123456
table.include.list=TEST.TEST
snapshot.mode=initial

kafka-ui界面如下 image 数据最终推送进入kafka表 image