Appearance
Oracle-11g CDC 集成
安装Kafka Connect
你可以使用docker-compose来启动服务,docker-compose.yml的内容如下:
yaml
version: '2'
services:
kafka-connect:
image: nexus3.tineco.com/kafka-connect:2.8.1
ports:
- '8083:8083'
environment:
- 'BOOTSTRAP_SERVERS=10.108.5.41:9092'
kafka-connect-ui:
image: nexus3.tineco.com/kafka-connect-ui:2.8.1
ports:
- '8000:8000'
environment:
- CONNECT_URL=http://10.107.8.21:8083注意:
CONNECT_URL需要设置为本机ip对应的http地址。
Oracle数据库需要的配置
需要打开数据库的recovery日志功能,参考如下:
sql
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
archive log list新建同步用户
需要新建同步账号debezium,脚本参考如下:
sql
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE TABLESPACE TEST0 DATAFILE '/opt/oracle/oradata/ORCLCDB/test0.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER debezium IDENTIFIED BY 123456
DEFAULT TABLESPACE TEST0
QUOTA UNLIMITED ON TEST0
GRANT CREATE SESSION TO debezium ;
GRANT SET CONTAINER TO debezium ;
GRANT SELECT ON V_$DATABASE to debezium ;
GRANT FLASHBACK ANY TABLE TO debezium ;
GRANT SELECT ANY TABLE TO debezium ;
GRANT SELECT_CATALOG_ROLE TO debezium ;
GRANT EXECUTE_CATALOG_ROLE TO debezium ;
GRANT SELECT ANY TRANSACTION TO debezium ;
--GRANT LOGMINING TO debezium ;
GRANT CREATE TABLE TO debezium ;
GRANT LOCK ANY TABLE TO debezium ;
GRANT ALTER ANY TABLE TO debezium ;
GRANT CREATE SEQUENCE TO debezium ;
GRANT EXECUTE ON DBMS_LOGMNR TO debezium ;
GRANT EXECUTE ON DBMS_LOGMNR_D TO debezium ;
GRANT SELECT ON V_$LOG TO debezium ;
GRANT SELECT ON V_$LOG_HISTORY TO debezium ;
GRANT SELECT ON V_$LOGMNR_LOGS TO debezium ;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO debezium ;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO debezium ;
GRANT SELECT ON V_$LOGFILE TO debezium ;
GRANT SELECT ON V_$ARCHIVED_LOG TO debezium ;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO debezium ;配置需要采集的表
sql
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE 【库】.【表】 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;配置Kafka Connector
修改对应的配置,并且把需要同步的表配置到table.include.list下面
properties
connector.class=io.debezium.connector.oracle.OracleConnector
database.user=debezium
database.dbname=DEBEZIUM
tasks.max=1
database.history.kafka.bootstrap.servers=10.108.5.41:9092
database.history.kafka.topic=schema-changes.inventory
database.url=jdbc:oracle:thin:@10.107.8.21:1521:xe
database.server.name=oracle-service
database.hostname=10.107.8.21
database.password=123456
table.include.list=TEST.TEST
snapshot.mode=initialkafka-ui界面如下
数据最终推送进入kafka表 
