暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用OGG微服务将Oracle同步到kafka(全量+增量)

DB宝 2022-07-25
1564


环境准备

Oracle环境

 1-- 创建专用网络
2docker network create --subnet=172.72.7.0/24 ora-network
3
4
5
6
7-- oracle 压测工具
8docker pull lhrbest/lhrdbbench:1.0 
9
10docker rm -f lhrdbbench
11docker run -d --name lhrdbbench -h lhrdbbench \
12  --net=ora-network --ip 172.72.7.33 \
13  -v /sys/fs/cgroup:/sys/fs/cgroup \
14  --privileged=true lhrbest/lhrdbbench:1.0 \
15  /usr/sbin/init
16
17
18
19
20-- Oracle 12c
21docker rm -f lhrora1221
22docker run -itd --name lhrora1221 -h lhrora1221 \
23  --net=ora-network --ip 172.72.7.34 \
24  -p 1526:1521 -p 3396:3389 \
25  --privileged=true \
26  lhrbest/oracle_12cr2_ee_lhr_12.2.0.1:2.0 init
27
28
29
30
31-- oracle数据库配置
321.开启数据库归档--如果没有开启
332.开启数据库级别附加日志--如果没有开始最小附加日志
343.开启强制日志--如果没有开启强制日志
354.设置ENABLE_GOLDENGATE_REPLICAT参数为TRUE
365.创建OGG用户包括包括源端用户、目标端用户以及OGG抽取用户
37
38
39alter database add supplemental log data;
40alter database add supplemental log data (all) columns;
41alter database force logging;
42alter system set enable_goldengate_replication=TRUE;
43
44select name,supplemental_log_data_min , force_logging, log_mode from v$database;
45
46
47alter system set streams_pool_size = 128M;
48alter system set sga_max_size = 2scope=spfile;
49alter system set sga_target = 2scope=spfile;
50alter system set pga_aggregate_target=1g;
51startup force
52
53
54-- OGG管理用户
55CREATE USER ogg identified by lhr;
56GRANT DBA to ogg;
57grant SELECT ANY DICTIONARY to ogg;
58GRANT EXECUTE ON SYS.DBMS_LOCK TO ogg;
59grant select any transaction to ogg;
60grant select any table to ogg;
61grant flashback any table to ogg;
62grant alter any table to ogg;
63
64exec dbms_goldengate_auth.grant_admin_privilege('OGG','*',TRUE); 
65
66
67-- 业务用户
68CREATE USER lhr identified by lhr;
69alter user lhr identified by lhr;
70GRANT DBA to lhr ;
71grant SELECT ANY DICTIONARY to lhr;
72GRANT EXECUTE ON SYS.DBMS_LOCK TO lhr;
73
74
75
76-- 启动监听
77vi /u01/app/oracle/product/12.2.0.1/dbhome_1/network/admin/listener.ora
78lsnrctl start
79lsnrctl status

Oracle数据初始化

  1-- 源端数据初始化
2/usr/local/swingbench/bin/oewizard  -s -create -c /usr/local/swingbench/wizardconfigs/oewizard.xml -create \
3-version 2.0  -cs //172.72.7.34/lhrsdb  -dba "sys as sysdba" -dbap lhr -dt thin \
4-ts users -u lhr -p lhr -allindexes  -scale 0.0001  -tc 16 -v -cl
5
6
7col TABLE_NAME format a30
8SELECT a.table_name,a.num_rows FROM dba_tables a where a.OWNER='LHR' ;
9select object_type,count(*) from dba_objects where owner='LHR' group by object_type;
10select object_type,status,count(*) from dba_objects where owner='LHR' group by object_type,status;
11select sum(bytes)/1024/1024 from dba_segments where owner='LHR';
12
13-- 检查键是否正确:https://www.xmmup.com/ogg-01296-biaoyouzhujianhuoweiyijiandanshirengranshiyongquanbulielaijiexixing.html
14-- 否则OGG启动后,会报错:OGG-01296、OGG-06439、OGG-01169 Encountered an update where all key columns for target table LHR.ORDER_ITEMS are not present.
15select owner, constraint_name, constraint_type, status, validated 
16from dba_constraints 
17where owner='LHR' 
18and VALIDATED='NOT VALIDATED';
19
20select 'alter table lhr.'||TABLE_NAME||' enable validate constraint '||CONSTRAINT_NAME||';' 
21from dba_constraints 
22where owner='LHR'
23and VALIDATED='NOT VALIDATED';
24
25
26-- 删除外键
27SELECT 'ALTER TABLE  LHR.'|| D.TABLE_NAME ||' DROP constraint '|| D.CONSTRAINT_NAME||';' 
28FROM DBA_constraints d where  owner='LHR' and d.CONSTRAINT_TYPE='R';
29
30
31
32
33sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb
34
35@/oggoracle/demo_ora_create.sql
36@/oggoracle/demo_ora_insert.sql
37
38
39SQL> select * from tcustmer;
40
41CUST NAME                           CITY                 ST
42---- ------------------------------ -------------------- --
43WILL BG SOFTWARE CO.                SEATTLE              WA
44JANE ROCKY FLYER INC.               DENVER               CO
45
46
47-- 创建2个clob和blob类型的表
48sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb  @/oggoracle/demo_ora_lob_create.sql
49exec testing_lobs;
50select * from lhr.TSRSLOB;
51
52drop table IMAGE_LOB;
53CREATE TABLE IMAGE_LOB (
54   T_ID VARCHAR2 (5NOT NULL,
55   T_IMAGE BLOB,
56   T_CLOB  CLOB 
57   );
58
59-- 插入blob文件  
60CREATE OR REPLACE DIRECTORY D1 AS '/home/oracle/';
61grant all on DIRECTORY  D1 TO PUBLIC;
62CREATE OR REPLACE NONEDITIONABLE PROCEDURE IMG_INSERT(TID      VARCHAR2,
63                                                      FILENAME VARCHAR2,
64                                                      name VARCHAR2) AS
65    F_LOB BFILE;
66    B_LOB BLOB;
67BEGIN
68    INSERT INTO IMAGE_LOB
69        (T_ID, T_IMAGE,T_CLOB)
70    VALUES
71        (TID, EMPTY_BLOB(),nameRETURN T_IMAGE INTO B_LOB;
72    F_LOB := BFILENAME('D1', FILENAME);
73    DBMS_LOB.FILEOPEN(F_LOB, DBMS_LOB.FILE_READONLY);
74    DBMS_LOB.LOADFROMFILE(B_LOB, F_LOB, DBMS_LOB.GETLENGTH(F_LOB));
75    DBMS_LOB.FILECLOSE(F_LOB);
76    COMMIT;
77END;
78/
79
80
81BEGIN
82    IMG_INSERT('1','1.jpg','xmmup.com');
83    IMG_INSERT('2','2.jpg','www.xmmup.com');
84 END;
85/
86
87
88select * from IMAGE_LOB;
89
90
91
92
93
94-----  oracle所有表
95SQL> select * from tab;
96
97TNAME                          TABTYPE  CLUSTERID
98------------------------------ ------- ----------
99ADDRESSES                      TABLE
100CARD_DETAILS                   TABLE
101CUSTOMERS                      TABLE
102IMAGE_LOB                      TABLE
103INVENTORIES                    TABLE
104LOGON                          TABLE
105ORDERENTRY_METADATA            TABLE
106ORDERS                         TABLE
107ORDER_ITEMS                    TABLE
108PRODUCTS                       VIEW
109PRODUCT_DESCRIPTIONS           TABLE
110PRODUCT_INFORMATION            TABLE
111PRODUCT_PRICES                 VIEW
112TCUSTMER                       TABLE
113TCUSTORD                       TABLE
114TSRSLOB                        TABLE
115TTRGVAR                        TABLE
116WAREHOUSES                     TABLE
117
11818 rows selected.
119
120
121
122SELECT COUNT(*) FROM LHR.ADDRESSES                      UNION ALL
123SELECT COUNT(*) FROM LHR.CARD_DETAILS                   UNION ALL
124SELECT COUNT(*) FROM LHR.CUSTOMERS                      UNION ALL
125SELECT COUNT(*) FROM LHR.IMAGE_LOB                      UNION ALL
126SELECT COUNT(*) FROM LHR.INVENTORIES                    UNION ALL
127SELECT COUNT(*) FROM LHR.LOGON                          UNION ALL
128SELECT COUNT(*) FROM LHR.ORDERENTRY_METADATA            UNION ALL
129SELECT COUNT(*) FROM LHR.ORDERS                         UNION ALL
130SELECT COUNT(*) FROM LHR.ORDER_ITEMS                    UNION ALL
131SELECT COUNT(*) FROM LHR.PRODUCT_DESCRIPTIONS           UNION ALL
132SELECT COUNT(*) FROM LHR.PRODUCT_INFORMATION            UNION ALL
133SELECT COUNT(*) FROM LHR.TCUSTMER                       UNION ALL
134SELECT COUNT(*) FROM LHR.TCUSTORD                       UNION ALL
135SELECT COUNT(*) FROM LHR.TSRSLOB                        UNION ALL
136SELECT COUNT(*) FROM LHR.TTRGVAR                        UNION ALL
137SELECT COUNT(*) FROM LHR.WAREHOUSES
138;
139
140  COUNT(*)
141----------
142       150
143       150
144       100
145         2
146    900724
147       239
148         4
149       143
150       773
151      1000
152      1000
153         2
154         2
155         1
156         0
157      1000
158
15916 rows selected.

最终,在Oracle端共包括16张表,2个视图,其中2个表TSRSLOB和IMAGE_LOB包括了blob和clob字段。

目标端kafka环境

 1docker pull lhrbest/kafka:3.2.0
2
3
4docker rm -f lhrkafka
5docker run -itd --name lhrkafka -h lhrkafka \
6  --net=ora-network --ip 172.72.7.44 \
7  -p 9092:9092 -p 2181:2181 \
8  -v /sys/fs/cgroup:/sys/fs/cgroup \
9  --privileged=true lhrbest/kafka:3.2.0 \
10  /usr/sbin/init
11
12docker exec -it lhrkafka bash
13
14
15-- 启动(默认已启动)
16/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
17/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
18
19[root@lhrkafka /]# jps
20161 QuorumPeerMain
21162 Kafka
221127 Jps
23[root@lhrkafka /]# ps -ef|grep java
24root         161       1  7 14:20 ?        00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties
25root         162       1 30 14:20 ?        00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties
26root        1167     961  0 14:20 pts/1    00:00:00 grep --color=auto java
27[root@lhrkafka /]# netstat -tulnp | grep java
28tcp        0      0 0.0.0.0:2181            0.0.0.0:*               LISTEN      161/java            
29tcp        0      0 0.0.0.0:9999            0.0.0.0:*               LISTEN      162/java            
30tcp        0      0 0.0.0.0:37691           0.0.0.0:*               LISTEN      161/java            
31tcp        0      0 0.0.0.0:40831           0.0.0.0:*               LISTEN      162/java            
32tcp        0      0 0.0.0.0:38977           0.0.0.0:*               LISTEN      162/java            
33tcp        0      0 0.0.0.0:9092            0.0.0.0:*               LISTEN      162/java 

kafka默认占用9092端口,ZK默认占用2181端口。

kafka日志:

1tailf /usr/local/kafka/logs/server.log

测试一下,在服务器上创建一个topic为test,然后生产几条信息:

 1-- 生产者
2/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
3>hello
4>world
5
6
7
8-- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据
9/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic test --from-beginning
10hello
11word
12
13
14
15
16-- 查看当前服务器中的所有 topic
17/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092

源端OGG for  Oracle微服务环境

 1-- OGG机器
2docker pull lhrbest/ogg213maoracle:v1.0
3
4docker rm -f lhrogg213maoracle
5docker run -d --name lhrogg213maoracle -h lhrogg213maoracle \
6  --net=ora-network --ip 172.72.7.100 \
7  -p 9391:3389 -p 29000-29005:9000-9005 \
8  -v /sys/fs/cgroup:/sys/fs/cgroup \
9  --privileged=true lhrbest/ogg213maoracle:v1.0 \
10  /usr/sbin/init
11
12
13docker exec -it lhrogg213maoracle bash
14
15su - oracle
16adminclient
17CONNECT http://127.0.0.1:9000 deployment deploy213 as oggadmin password lhr

访问:http://192.168.1.35:29001 ,用户名:oggadmin,密码:lhr

创建身份证明、添加trandata

1ogg@172.72.7.34/lhrsdb

目标端OGG for  bigdata微服务环境

 1docker pull lhrbest/ogg214mabigdata:v1.0
2
3docker rm -f lhrogg214mabigdata
4docker run -d --name lhrogg214mabigdata -h lhrogg214mabigdata \
5  --net=ora-network --ip 172.72.7.101 \
6  -p 9191:3389 -p 9000-9005:9000-9005 \
7  -v /sys/fs/cgroup:/sys/fs/cgroup \
8  --privileged=true lhrbest/ogg214mabigdata:v1.0 \
9  /usr/sbin/init
10
11
12docker exec -it lhrogg214mabigdata bash
13
14
15-- 配置kafka参数
16vi /ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
17gg.handler.kafkahandler.schemaTopicName=LHR_OGG
18
19
20vi  /ogg214c/ogg_deploy/etc/conf/ogg/custom_kafka_producer.properties
21bootstrap.servers=172.72.7.44:9092

访问:http://192.168.1.35:9001 ,用户名:oggadmin,密码:lhr

全量同步

注意:在此阶段,源端需要停业务,不能产生新数据。

源端创建初始化加载

1EXTRACT ext0
2USERIDALIAS ora12c  domain OGGMA
3rmthost 172.72.7.101,mgrport 9003
4rmtfile ./dirdat/e0
5tableexclude LHR.PRODUCTS;
6tableexclude LHR.PRODUCT_PRICES;
7TABLE LHR.*;

查询报告,说明数据已经传输到目标端了,如下:

进入OS查询:

1[root@lhrogg214mabigdata dirdat]# pwd
2/ogg214c/ogg_deploy/var/lib/data/dirdat
3[root@lhrogg214mabigdata dirdat]# ll
4total 84236
5-rw-r----- 1 oracle oinstall 86256166 Jul 22 12:52 e0000000
6[root@lhrogg214mabigdata dirdat]
7[root@lhrogg214mabigdata dirdat]# ll -h
8total 83M
9-rw-r----- 1 oracle oinstall 83M Jul 22 12:52 e0000000

目标端kafka数据全量初始化

1REPLICAT rep0
2targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
3end runtime
4map LHR.*, target LHR.*;

运行完后,自动停止:

全量同步结果检查

image-20220722142220713
 1-- 查看所有历史数据
2/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic LHR_OGG --from-beginning
3
4
5
6-- 查看当前服务器中的所有 topic
7/usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
8
9
10-- topic详情
11/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic LHR_OGG

一张表一个主题,如下:

 1[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
2__consumer_offsets
3test
4[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092
5ADDRESSES
6CARD_DETAILS
7CUSTOMERS
8EMP
9IMAGE_LOB
10INVENTORIES
11LHR_OGG
12LOGON
13ORDERENTRY_METADATA
14ORDERS
15ORDER_ITEMS
16PRODUCT_DESCRIPTIONS
17PRODUCT_INFORMATION
18TCUSTMER
19TCUSTORD
20TSRSLOB
21WAREHOUSES
22__consumer_offsets
23test
24[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --list  --bootstrap-server  localhost:9092 | wc -l
2519
26[root@lhrkafka /]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server  localhost:9092 --describe --topic WAREHOUSES
27Topic: WAREHOUSES       TopicId: HR3273rMTK6JsQt8OTjKNA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
28        Topic: WAREHOUSES       Partition: 0    Leader: 0       Replicas: 0     Isr: 0
29[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic WAREHOUSES --from-beginning | wc -l       
30^CProcessed a total of 1000 messages

数据已全量同步完成。

增量同步

Oracle端配置

数据目录:/ogg213c/ogg_deploy/var/lib/data/dirdat

image-20220722142355233
1EXTRACT ext1
2USERIDALIAS ora12c DOMAIN OGGMA
3DDL INCLUDE MAPPED
4DDLOPTIONS REPORT
5EXTTRAIL ./dirdat/e1
6table LHR.*;

源端配置数据分发服务

登陆:http://192.168.1.35:29002

1trail://172.72.7.100:9002/services/v2/sources?trail=./dirdat/e1
2ogg://172.72.7.101:9003/services/v2/targets?trail=./dirdat/e1

此时,bigdata会自动添加接收方服务:

文件已传输到目标端:

1[root@lhrogg214mabigdata dirdat]# ll
2total 84252
3-rw-r----- 1 oracle oinstall 86256166 Jul 22 12:52 e0000000
4-rw-r----- 1 oracle oinstall    13994 Jul 22 15:37 e1000000000
5[root@lhrogg214mabigdata dirdat]

kafka端应用配置

目标端选项较多,包括:Warehouse、Cassandra、HBase、HDFS、JDBC、Kafka和Kafka Connect等。

1REPLICAT rep1
2targetdb libfile libggjava.so set property=/ogg214c/ogg_deploy/etc/conf/ogg/kafka.props
3map LHR.*, target LHR.*;

增量测试

1LHR@lhrsdb> delete from ADDRESSES where rownum<=1;
2
31 row deleted.
4
5LHR@lhrsdb> commit;
6
7Commit complete.

源端:

数据分发:

kafka端:

命令行接收:

1[root@lhrkafka /]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092  --topic ADDRESSES
2LHR.ADDRESSESD42022-07-22 15:47:58.00481242022-07-22 15:48:01.593000(00000000000000026625&2022-06-28 16:15:0025Street NameTalgarthBerkshireV

可见,数据会增量同步的。

使用kafka manager查看kafka数据

参考:https://www.xmmup.com/kafkatuxingguanligongjucmakkafka-manageranzhuangjishiyong.html

 1docker pull registry.cn-hangzhou.aliyuncs.com/lhrbest/kafkamanager_cmak:3.0.0.6
2
3docker rm -f lhrkafkamanager
4docker run -itd --name lhrkafkamanager -h lhrkafkamanager \
5  --net=ora-network --ip 172.72.7.45 \
6  -p 9100:9000  \
7  -v /sys/fs/cgroup:/sys/fs/cgroup \
8  --privileged=true lhrbest/kafkamanager_cmak:3.0.0.6 \
9  /usr/sbin/init
10
11docker exec -it lhrkafkamanager bash
12
13web登陆地址:http://192.168.1.35:9100/

总结

1、配置数据分发服务时,需要注意dirdat的位置

2、分发是9002端口,接收是9003端口。


文章转载自DB宝,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论