

Oracle数据发送到kafka传输数据

一、需求场景确认


根据需求沟通,需要将Oracle数据库实时数据传输给Kafka实现消息订阅,从而满足业务进行数据分析。
对于上述需求我们采用GoldenGate抽取Oracle数据库实时数据库事务的变化,将事务信息传输给大数据业务部门配置的Kafka消息队列集群环境中,用户通过Topic消息订阅来实现业务实时数据分析。

二、本次数据同步需要的安装包


本次环境均在Linux环境下进行配置
OGG ADPATER FOR KAFKA
需要的kafka包:
Kafka 0.8.2.1kafka-clients-0.8.2.1.jarlz4-1.2.0.jarslf4j-api-1.7.6.jarsnappy-java-1.1.1.6.jar

三、数据库层执行抽取数据、传输数据配置

1.配置数据库附加日志


selectSUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI,FORCE_LOGGINGfromv$database;SUPPLEMESUPSUPFOR-----------------YESYESYESYES
需要满足附加日志如上查询结果。
具体修改附加日志语句如下:
#常用的最小附加日志添加:alterdatabaseaddsupplementallogdata;ALTERDATABASEADDSUPPLEMENTALLOGDATA(PRIMARYKEY,UNIQUEINDEX)COLUMNS;ALTERDATABASEADDSUPPLEMENTALLOGDATA(all)COLUMNS;

2.源端数据库配置OGG抽取数据


主库详细操作命令如下:
dblogin userid goldengate, password oggpasswordadd extract EXTJMS,tranlog, threads 3,begin nowadd exttrail /data/oggsrc/dirdat/jm, extract EXTJMS megabytes 200addtrandatatestKAFKA.*--add schematrandata testKAFKA
抽取进程:
edit param extjmsEXTRACT EXTJMSSETENV (ORACLE_SID = "rac3")SETENV (ORACLE_HOME=/data/app/oracle/product/10.2.0/db_1)SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")DBOPTIONS ALLOWUNUSEDCOLUMN, FETCHBATCHSIZE 1500userid goldengate, password oggpasswordEXTTRAIL /data/oggsrc/dirdat/jm, FORMAT RELEASE 9.5DISCARDFILE /data/oggsrc/dirtmp/EXTJMS.dsc, APPEND, MEGABYTES 500tranlogoptions asmuser SYS@rac_asm, ASMPASSWORD oracle_123THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000WARNLONGTRANS 30MIN, CHECKINTERVAL 3MINCHECKPOINTSECS 5FLUSHCSECS 80GETUPDATEBEFORESNOCOMPRESSUPDATESNOCOMPRESSDELETESRecoveryOptions OverwriteMode--DDL INCLUDE ALLDDL INCLUDE MAPPED &exclude objname testKAFKA.PK_CATEGORY_RANKLIST &exclude objtype 'PACKAGE' &exclude objtype 'PACKAGE BODY' &exclude INSTR 'REPLACE SYNONYM' &exclude INSTR 'CREATE OR REPLACE PACKAGE' &exclude objtype 'PROCEDURE' &exclude objtype 'FUNCTION' &exclude objtype 'TYPE' &exclude objtype 'TRIGGER' &exclude objtype 'GRANT' &exclude instr 'GRANT' &exclude objtype 'DATABASE LINK' &exclude objtype 'CONSTRAINT' &exclude objtype 'JOB' &exclude instr 'ALTER SESSION' &exclude instr 'MATERIALIZED VIEW' &exclude INSTR 'AS SELECT' &exclude INSTR 'REPLACE SYNONYM' &EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_CMP" &EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_UNCMP"--GETUPDATEBEFORES--ddloptions addtrandata,REPORTFETCHOPTIONS, USESNAPSHOT, NOUSELATESTVERSION, MISSINGROW REPORTdynamicresolutionEOFDELAYCSECS 5TABLEEXCLUDE testKAFKA.ACTION_LOG;TABLE testKAFKA.* ;

3.源端添加新的PUMP进程


在testKAFKA源库测试添加pump进程:
添加pump进程:
添加新的pump:
add extract EDPKK,exttrailsource /data/oggsrc/dirdat/jm, begin now
4.编辑PUMP文件


edit param EDPKKEXTRACT EDPKKsetenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)PASSTHRUGETUPDATEBEFORESRecoveryOptions OverwriteModeRMTHOST 192.168.0.3, MGRPORT 7839RMTTRAIL /data/ogg_for_bigdata/dirdat/kkRMTTRAIL /data/ogg_for_kafka/dirdat/kkDISCARDFILE ./dirrpt/EDPKK.dsc,APPEND,MEGABYTES 5TABLE testKAFKA.* ;

5.为PUMP进程配置对应的远端文件


add rmttrail /data/ogg_for_bigdata/dirdat/kk, extract EDPKK megabytes 200
编辑表结构异构定义文件并传输到目标端
userid goldengate, password oggpassworddefsfile dirdef/testKAFKA.defTABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;TABLE testKAFKA.*;
生成定义文件
./defgen paramfile ./dirprm/defgen.prm
传输文件到指定目标端服务器:
scp dirdef/testKAFKA.def oracle@192.168.0.3:/data/ogg_for_bigdata/dirdef

6.目标端配置MGR管理进程


edit param mgrPORT 7839DYNAMICPORTLIST 7840-7850--AUTOSTART replicat *--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2PURGEOLDEXTRACTS /data/ogg_for_kafka1/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2LAGREPORTHOURS 1LAGINFOMINUTES 30LAGCRITICALMINUTES 45
对于管理进程配置就按上述大众配置即可满足要求,是否开启自动启动复制进程、自动按照时间间隔启动所有进程可以根据用户自身需求进行修改,PURGEOLDEXTRACTS参数控制删除指定文件目录的trail文件。

7.目标端配置DATA PUMP传输进程


使用版本:
Version 12.1.2.1.4 20470586OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209
添加pump进程
ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kkedit param repkkEXTRACT repkkSETENV (GGS_USEREXIT_CONF ="dirprm/repkk.props")GetEnv (JAVA_HOME)GetEnv (PATH)GetEnv (LD_LIBRARY_PATH)SourceDefs dirdef/testKAFKA.defCUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBeforesGetUpdateBeforesNoCompressDeletesNoCompressUpdatesNoTcpSourceTimerTABLEEXCLUDE testKAFKA.MV*;TABLE testKAFKA.*;

8.配置文件信息如下


repkk.props配置文件:
[oracle@repvm dirprm]$ cat repkk.propsgg.handlerlist =kafkahandler#gg.handler.kafkahandler.type=oracle.goldengate.handler.kafka.KafkaHandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=core_kafka_producer.propertiesgg.handler.kafkahandler.TopicName =zqtestgg.handler.kafkahandler.format =avro_opgg.handler.kafkahandler.SchemaTopicName=mySchemaTopicgg.handler.kafkahandler.BlockingSend =falsegg.handler.kafkahandler.includeTokens=falsegg.handler.kafkahandler.mode =tx#gg.handler.kafkahandler.maxGroupSize =100, 1Mb#gg.handler.kafkahandler.minGroupSize =50, 500Kbgoldengate.userexit.timestamp=utcgoldengate.userexit.writers=javawriterjavawriter.stats.display=TRUEjavawriter.stats.full=TRUEgg.log=log4j#gg.log.level=INFOgg.log.level=DEBUGgg.report.time=30sec#gg.classpath=dirprm/:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar:/data/ogg_for_kafka/dirprm/kafka_jar/*:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*gg.classpath=dirprm:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*javawriter.bootoptions=-Xmx4096m -Xms4096m -Djava.class.path=/data/ogg_for_kafka/ggjava/ggjava.jar:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties[oracle@repvm dirprm]$
javawriter.bootoptions 必须包含ogg for kafka的lib包
访问kafka消息队列的配置信息
kafka的属性文件:
bootstrap.servers=localhost:9092acks = 1compression.type = gzipreconnect.backoff.ms = 1000value.serializer = org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer = org.apache.kafka.common.serialization.ByteArraySerializer# 100KB per partitionbatch.size = 102400linger.ms = 10000max.request.size = 5024000send.buffer.bytes = 5024000
注:compression.type 参数默认即可,本处配置的是访问单机环境,集群环境则使用多个IP地址进行配置。

四、配置ZOOKEEPER和KAFKA队列


配置kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ pwd/data/kafka_2.10-0.8.2.2[oracle@repvm kafka_2.10-0.8.2.2]$[oracle@repvm kafka_2.10-0.8.2.2]$ grep -v '^$\|^\s*\#' config/server.propertiesbroker.id=0port=9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000log.cleaner.enable=falsezookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000[oracle@repvm kafka_2.10-0.8.2.2]$[oracle@repvm libs]$ lltotal 17452-rw-r--r-- 1 oracle oinstall 53244 Aug 31 2014 jopt-simple-3.2.jar-rw-r--r-- 1 oracle oinstall 3991269 Sep 3 2015 kafka_2.10-0.8.2.2.jar-rw-r--r-- 1 oracle oinstall 37748 Sep 3 2015 kafka_2.10-0.8.2.2-javadoc.jar-rw-r--r-- 1 oracle oinstall 2324165 Sep 3 2015 kafka_2.10-0.8.2.2-scaladoc.jar-rw-r--r-- 1 oracle oinstall 521466 Sep 3 2015 kafka_2.10-0.8.2.2-sources.jar-rw-r--r-- 1 oracle oinstall 1233391 Sep 3 2015 kafka_2.10-0.8.2.2-test.jar-rw-r--r-- 1 oracle oinstall 324016 Sep 3 2015 kafka-clients-0.8.2.2.jar-rw-r--r-- 1 oracle oinstall 481535 Aug 31 2014 log4j-1.2.16.jar-rw-r--r-- 1 oracle oinstall 165505 Aug 31 2014 lz4-1.2.0.jar-rw-r--r-- 1 oracle oinstall 82123 Aug 31 2014 metrics-core-2.2.0.jar-rw-r--r-- 1 oracle oinstall 7126372 Nov 25 2014 scala-library-2.10.4.jar-rw-r--r-- 1 oracle oinstall 28688 Aug 31 2014 slf4j-api-1.7.6.jar-rw-r--r-- 1 oracle oinstall 9753 Aug 31 2014 slf4j-log4j12-1.6.1.jar-rw-r--r-- 1 oracle oinstall 594033 May 29 2015 snappy-java-1.1.1.7.jar-rw-r--r-- 1 oracle oinstall 64009 Aug 31 2014 zkclient-0.3.jar-rw-r--r-- 1 oracle oinstall 792964 Aug 31 2014 zookeeper-3.4.6.jar[oracle@repvm libs]$kafka-clients-0.8.2.1.jarlz4-1.2.0.jarslf4j-api-1.7.6.jarsnappy-java-1.1.1.6.jar[oracle@repvm bin]$ pwd/data/kafka_2.10-0.8.2.2/bin[oracle@repvm bin]$nohup sh kafka-server-start.sh ../config/server.properties > /tmp/server.log &
先启动zookeeper:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &[1] 18645nohup: ignoring input and appending output to `nohup.out'[oracle@repvm kafka_2.10-0.8.2.2]$ tail -f nohup.out[2016-06-02 12:22:42,981] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:42,981] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:42,981] INFO Server environment:os.version=2.6.32-358.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:42,981] INFO Server environment:user.name=oracle (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:42,981] INFO Server environment:user.home=/home/oracle (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:42,981] INFO Server environment:user.dir=/data/kafka_2.10-0.8.2.2 (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:43,013] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:43,013] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:43,013] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)[2016-06-02 12:22:43,035] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
启动kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup bin/kafka-server-start.sh config/server.properties &[1] 18845nohup: ignoring input and appending output to `nohup.out'[oracle@repvm kafka_2.10-0.8.2.2]$
创建topic:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtestCreated topic "zqtest".[oracle@repvm kafka_2.10-0.8.2.2]
查看:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --list --zookeeper localhost:2181 zqtest[oracle@repvm kafka_2.10-0.8.2.2]$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
注:此处创建的topic "zqtest" 与ogg配置文件中指定的是相同的topic
测试发送消息:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zqtest[2016-06-02 14:33:50,690] WARN Property topic is not valid (kafka.utils.VerifiableProperties)ds
接收端:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zqtest --from-beginningds
成功接收,说明消息的发送和接收都配置正确,对于数据库层事务变化的消息数据也同样能够执行接收,具体查询还可以参考Kafka消息接收的详细配置,通过浏览器方式来显示消费记录。

结语


本次主要测试直接传输使用kafka接收!详细优化后期再予以进一步加工!
flume也是一样!甚至是 hadoop hbase hive !
本文已经多次实验!配置没有问题!
也是以后大数据整合的案例!以后继续深入实验测试!





