暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

欧哥哥(OGG)数据传输案例分享之Kafka篇

小陈Teacher 2022-01-20
1865

    Oracle数据发送到kafka传输数据


一、需求场景确认

根据需求沟通,需要将Oracle数据库实时数据传输给Kafka实现消息订阅,从而满足业务进行数据分析。


对于上述需求我们采用GoldenGate抽取Oracle数据库实时数据库事务的变化,将事务信息传输给大数据业务部门配置的Kafka消息队列集群环境中,用户通过Topic消息订阅来实现业务实时数据分析。


二、本次数据同步需要的安装包

 本次环境均在Linux环境下进行配置

OGG ADPATER FOR KAFKA

需要的kafka包: 

Kafka 0.8.2.1 
kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar

三、数据库层执行抽取数据、传输数据配置

1.配置数据库附加日志

select
SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI,FORCE_LOGGINGfromv$database;
SUPPLEMESUPSUPFOR-----------------YESYESYESYES

需要满足附加日志如上查询结果。

具体修改附加日志语句如下:

#常用的最小附加日志添加:
alterdatabaseaddsupplementallogdata;
ALTERDATABASEADDSUPPLEMENTALLOGDATA(PRIMARYKEY,UNIQUEINDEX)COLUMNS;
ALTERDATABASEADDSUPPLEMENTALLOGDATA(all)COLUMNS;

2.源端数据库配置OGG抽取数据

主库详细操作命令如下:

dblogin userid goldengate, password oggpassword
add extract EXTJMS,tranlog, threads 3,begin now
add exttrail /data/oggsrc/dirdat/jm, extract EXTJMS megabytes 200
addtrandatatestKAFKA.*
--add schematrandata testKAFKA

抽取进程: 

edit param extjms
EXTRACT EXTJMS
SETENV (ORACLE_SID = "rac3")
SETENV (ORACLE_HOME=/data/app/oracle/product/10.2.0/db_1)
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS ALLOWUNUSEDCOLUMN, FETCHBATCHSIZE 1500
userid goldengate, password oggpassword
EXTTRAIL /data/oggsrc/dirdat/jm, FORMAT RELEASE 9.5
DISCARDFILE /data/oggsrc/dirtmp/EXTJMS.dsc, APPEND, MEGABYTES 500
tranlogoptions asmuser SYS@rac_asm, ASMPASSWORD oracle_123
THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000
WARNLONGTRANS 30MIN, CHECKINTERVAL 3MIN
CHECKPOINTSECS 5
FLUSHCSECS 80
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
--DDL INCLUDE ALL
DDL INCLUDE MAPPED &
exclude objname testKAFKA.PK_CATEGORY_RANKLIST &
exclude objtype 'PACKAGE' &
exclude objtype 'PACKAGE BODY' &
exclude INSTR 'REPLACE SYNONYM' &
exclude INSTR 'CREATE OR REPLACE PACKAGE' &
exclude objtype 'PROCEDURE' &
exclude objtype 'FUNCTION' &
exclude objtype 'TYPE' &
exclude objtype 'TRIGGER' &
exclude objtype 'GRANT' &
exclude instr 'GRANT' &
exclude objtype 'DATABASE LINK' &
exclude objtype 'CONSTRAINT' &
exclude objtype 'JOB' &
exclude instr 'ALTER SESSION' &
exclude instr 'MATERIALIZED VIEW' &
exclude INSTR 'AS SELECT' &
exclude INSTR 'REPLACE SYNONYM' &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_CMP" &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_UNCMP"
--GETUPDATEBEFORES
--ddloptions addtrandata,REPORT
FETCHOPTIONS, USESNAPSHOT, NOUSELATESTVERSION, MISSINGROW REPORT
dynamicresolution
EOFDELAYCSECS 5
TABLEEXCLUDE testKAFKA.ACTION_LOG;
TABLE testKAFKA.* ;


3.源端添加新的PUMP进程


在testKAFKA源库测试添加pump进程:
添加pump进程:
添加新的pump:

add extract EDPKK,exttrailsource /data/oggsrc/dirdat/jm, begin now

4.编辑PUMP文件


edit param EDPKK
EXTRACT EDPKK
setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)
PASSTHRU
GETUPDATEBEFORES
RecoveryOptions OverwriteMode
RMTHOST 192.168.0.3, MGRPORT 7839
RMTTRAIL /data/ogg_for_bigdata/dirdat/kk
RMTTRAIL /data/ogg_for_kafka/dirdat/kk
DISCARDFILE ./dirrpt/EDPKK.dsc,APPEND,MEGABYTES 5
TABLE testKAFKA.* ;


5.为PUMP进程配置对应的远端文件


add rmttrail /data/ogg_for_bigdata/dirdat/kk, extract EDPKK megabytes 200

编辑表结构异构定义文件并传输到目标端

userid goldengate, password oggpassword
defsfile dirdef/testKAFKA.def
TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;
TABLE testKAFKA.*;

生成定义文件

./defgen paramfile ./dirprm/defgen.prm

传输文件到指定目标端服务器:

scp  dirdef/testKAFKA.def oracle@192.168.0.3:/data/ogg_for_bigdata/dirdef


6.目标端配置MGR管理进程


edit param mgr
PORT 7839
DYNAMICPORTLIST 7840-7850
--AUTOSTART replicat *
--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2
AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10
PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
PURGEOLDEXTRACTS /data/ogg_for_kafka1/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

对于管理进程配置就按上述大众配置即可满足要求,是否开启自动启动复制进程、自动按照时间间隔启动所有进程可以根据用户自身需求进行修改,PURGEOLDEXTRACTS参数控制删除指定文件目录的trail文件。

7.目标端配置DATA PUMP传输进程

使用版本:

Version 12.1.2.1.4 20470586 
OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209

添加pump进程

ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kk
edit param repkk
EXTRACT repkk
SETENV (GGS_USEREXIT_CONF ="dirprm/repkk.props")
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
SourceDefs dirdef/testKAFKA.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GetUpdateBefores
NoCompressDeletes
NoCompressUpdates
NoTcpSourceTimer
TABLEEXCLUDE testKAFKA.MV*;
TABLE testKAFKA.*;


8.配置文件信息如下

repkk.props配置文件

[oracle@repvm dirprm]$ cat repkk.props 
gg.handlerlist =kafkahandler
#gg.handler.kafkahandler.type=oracle.goldengate.handler.kafka.KafkaHandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=core_kafka_producer.properties
gg.handler.kafkahandler.TopicName =zqtest
gg.handler.kafkahandler.format =avro_op
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode =tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
#gg.log.level=INFO
gg.log.level=DEBUG
gg.report.time=30sec
#gg.classpath=dirprm/:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar:/data/ogg_for_kafka/dirprm/kafka_jar/*:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*
gg.classpath=dirprm:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*
javawriter.bootoptions=-Xmx4096m -Xms4096m -Djava.class.path=/data/ogg_for_kafka/ggjava/ggjava.jar:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties
[oracle@repvm dirprm]$

javawriter.bootoptions 必须包含ogg for kafka的lib包


访问kafka消息队列的配置信息
kafka的属性文件:

bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 10000
max.request.size = 5024000
send.buffer.bytes = 5024000

注:compression.type 参数默认即可,本处配置的是访问单机环境,集群环境则使用多个IP地址进行配置。


四、配置ZOOKEEPER和KAFKA队列 

配置kafka:

[oracle@repvm kafka_2.10-0.8.2.2]$ pwd
/data/kafka_2.10-0.8.2.2
[oracle@repvm kafka_2.10-0.8.2.2]$
[oracle@repvm kafka_2.10-0.8.2.2]$ grep -v '^$\|^\s*\#' config/server.properties
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
[oracle@repvm kafka_2.10-0.8.2.2]$
[oracle@repvm libs]$ ll
total 17452
-rw-r--r-- 1 oracle oinstall 53244 Aug 31 2014 jopt-simple-3.2.jar
-rw-r--r-- 1 oracle oinstall 3991269 Sep 3 2015 kafka_2.10-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall 37748 Sep 3 2015 kafka_2.10-0.8.2.2-javadoc.jar
-rw-r--r-- 1 oracle oinstall 2324165 Sep 3 2015 kafka_2.10-0.8.2.2-scaladoc.jar
-rw-r--r-- 1 oracle oinstall 521466 Sep 3 2015 kafka_2.10-0.8.2.2-sources.jar
-rw-r--r-- 1 oracle oinstall 1233391 Sep 3 2015 kafka_2.10-0.8.2.2-test.jar
-rw-r--r-- 1 oracle oinstall 324016 Sep 3 2015 kafka-clients-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall 481535 Aug 31 2014 log4j-1.2.16.jar
-rw-r--r-- 1 oracle oinstall 165505 Aug 31 2014 lz4-1.2.0.jar
-rw-r--r-- 1 oracle oinstall 82123 Aug 31 2014 metrics-core-2.2.0.jar
-rw-r--r-- 1 oracle oinstall 7126372 Nov 25 2014 scala-library-2.10.4.jar
-rw-r--r-- 1 oracle oinstall 28688 Aug 31 2014 slf4j-api-1.7.6.jar
-rw-r--r-- 1 oracle oinstall 9753 Aug 31 2014 slf4j-log4j12-1.6.1.jar
-rw-r--r-- 1 oracle oinstall 594033 May 29 2015 snappy-java-1.1.1.7.jar
-rw-r--r-- 1 oracle oinstall 64009 Aug 31 2014 zkclient-0.3.jar
-rw-r--r-- 1 oracle oinstall 792964 Aug 31 2014 zookeeper-3.4.6.jar
[oracle@repvm libs]$
kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar
[oracle@repvm bin]$ pwd
/data/kafka_2.10-0.8.2.2/bin
[oracle@repvm bin]$nohup sh  kafka-server-start.sh ../config/server.properties > /tmp/server.log &

先启动zookeeper:

[oracle@repvm kafka_2.10-0.8.2.2]$ nohup  bin/zookeeper-server-start.sh config/zookeeper.properties &
[1] 18645
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$ tail -f nohup.out
[2016-06-02 12:22:42,981] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.version=2.6.32-358.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.name=oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.home=/home/oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.dir=/data/kafka_2.10-0.8.2.2 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,035] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

启动kafka:

[oracle@repvm kafka_2.10-0.8.2.2]$ nohup bin/kafka-server-start.sh config/server.properties &
[1] 18845
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$

创建topic:

[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
Created topic "zqtest".
[oracle@repvm kafka_2.10-0.8.2.2]

查看:

[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --list --zookeeper localhost:2181 zqtest
[oracle@repvm kafka_2.10-0.8.2.2]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest

注:此处创建的topic "zqtest" 与ogg配置文件中指定的是相同的topic


测试发送消息:

[oracle@repvm kafka_2.10-0.8.2.2]$  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zqtest 
[2016-06-02 14:33:50,690WARN Property topic is not valid (kafka.utils.VerifiableProperties)
ds

接收端:

[oracle@repvm kafka_2.10-0.8.2.2]$  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zqtest --from-beginning
ds

成功接收,说明消息的发送和接收都配置正确,对于数据库层事务变化的消息数据也同样能够执行接收,具体查询还可以参考Kafka消息接收的详细配置,通过浏览器方式来显示消费记录。

结语

本次主要测试直接传输使用kafka接收!详细优化后期再予以进一步加工!
flume也是一样!甚至是 hadoop hbase hive !
本文已经多次实验!配置没有问题!
也是以后大数据整合的案例!以后继续深入实验测试!


文章转载自小陈Teacher,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论