暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kettle插件开发之MQToSQL

以数据之名 2021-06-08
1736

一、背景介绍

知其然,知其所以然。要开发MQToSQL插件,首先我们要知道什么是MQ?MQ的生产者是谁?MQ的消息体格式是什么类型?

  • MQ(Message Queue)消息队列,是一种“先进先出”的基础数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构;


  • MQ的生产者是Oracle Goldengate For Big Data 12c(OGG实时将事务性数据流式传输到大数据和云系统,而不影响源系统的性能。它将实时数据交付简化为最流行的大数据解决方案,包括Apache Hadoop, Apache HBase, Apache Hive, Confluent Kafka, NoSQL Databases, Elasticsearch, JDBC, Oracle Cloud, Amazon Web Services, Microsoft Azure Cloud, Google Cloud Platform, and Data Warehouses,以提高洞察力和及时行动);

 

  • MQ的消息体格式是Json,采用的产品是Apache Kafka(Apache Kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序)。

二、数据结构

  • 从源端数据库,通过OGG做增量数据捕获,推送增量数据到Kafka

  • 再通过Kettle插件pentaho-kafka-consumer消费消息,获取到增量消息数据流

  • 再通过Kettle插件app-etl-mts(MQToSQL)解析消息数据,根据操作类型及消息字段,动态生成DML语句

  • 把生成后的DML语句,通过kettle插件ExeSqlRow

三、OGG配置


3.1、OGG for MySQL

3.1.1、MySQL数据库准备

A.      开启行模式

数据库必须打开归档模式(OGG重启时需要读取未提交交易开始时的日志)

    mysql> show variables like 'binlog_format';
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | binlog_format | ROW |
    +---------------+-------+
    1 row in set (0.00 sec)

    3.1.2、OGG for MySQL部署

    上传软件包到安装目录/u02/ogg/ggs,解压缩安装包,执行GGSCI进入命令行界面,在GGSCI环境下创建GoldenGate子目录

      $ cd u01/ogg/ggs
      $ unzip ***.zip
      $ ./ggsci
      GGSCI>create subdirs

      该命令会在OGG安装目录下建立若干子目录,其中几个主要目录如下所示:

      dirchk:用于存放各个进程的检查点

      dirdat:用于存放抽取出来的Trail队列文件

      dirprm:用于存放各个进程的参数文件

      dirrpt:用于存放各个进程的运行状况报告

      dirpcs:存放各个正在运行的进程信息

      3.1.3、OGG Trail file

      Local Trail file和Remote Trail file全部放置在新建的/u02/ogg/ggs/dirdat下面

      3.1.4、OGG MGR配置

        --on Source and Target db
        GGSCI>edit params mgr
        PORT 7809
        AUTOSTART ER *
        AUTORESTART ER *, RETRIES 12, WAITMINUTES 5, RESETMINUTES 60
        PURGEOLDEXTRACTS /u02/ogg/ggs/dirdat/*, usecheckpoints, minkeepdays7
        • OGG同步默认端口号7809,还可以设置如下的动态端口

        • DYNAMICPORTLIST 7820-7830, 7833, 7835

        • Manger进程启动时使用AUTOSTART参数自动启动EXTRACT和REPLICAT进程,并使用AUTORESTART参数设置启动失败后的自动重启次数,时间等

        • --使用PURGEOLDEXTRACTS参数设置过期trail文件执行的删除选项

        3.1.5、EXTRACT配置

        Extract进程名称定义为:e_******,最大支持8位

        Pump进程名称定义为:p_******,最大支持8位

        Replicate进程名称定义为:r_******,最大支持8位

          On Source DB
          GGSCI>edit params ekettle
          extract ekettle
          tranlogoptions altlogdest /home/mysql/mysql/mysql-kettle/mysql-bin.index
          sourcedb kettle@127.0.0.1:3306,userid root,password xxxxxxxxxxx
          exttrail /opt/mysql/ogg/ggs/dirdat/kettle3306/aa
          dynamicresolution
          getUpdateBefores
          gettruncates
          table test.a;
          table oggtest.*;

          说明:

          • Altlogdest用来指定binlog的索引文件

          • Sourcedb用来指定连接的数据源:包括database、IP address和Port,Instance account和password

          3.1.6、PUMP配置

            On Source DB
            GGSCI>edit params pkafka02
            extract pkafka02
            rmthost 127.0.0.1, mgrport 7809
            rmttrail /opt/ogg/dirdat/km
            passthru
            table oggtest.*;
            table test.a;

            此处使用PASSTHRU参数不检查源数据库和目标数据库的表定义情况

            3.1.7、自定义表结构defgen

              GGSCI (ogg) 16> edit params defgen
              defsfile /u02/ogg/ggs/dirdef/defgen.prm
              sourcedb sourcedb pythondb@127.0.0.1:3306,userid root, password xxxxxx
              table pythondb.*;
              [20:13:20 root@ms01 ~]# cd $GG_HOME
              [20:13:38 root@ms01 ggs]# ./defgen PARAMFILE ./dirprm/defgen.prm NOEXTATTR

              说明:

              • 为防止版本不兼容,可以在自定义表结构时添加NOEXTATTR参数过滤

              • 自定义文件生成后需要复制到目标端指定目录

              3.2、OGG for Kafka

              3.2.1、REPLICAT配置

                --on Target kafka
                GGSCI (ellen-m1) 2> view params rkafka02

                REPLICAT rkafka02
                TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka2.props
                REPORTCOUNT EVERY 1 MINUTES, RATE
                GROUPTRANSOPS 10000
                MAP oggtest.oggtest, TARGET myoggtest.oggtest;

                • --REPLICAT指定源端到目标端之间表的映射关系,建议要同步的表都定义主键

                • --注意配置环境变量,否则有可能找不到数据库

                3.2.2、kafka.props配置

                 
                  [10:37:08 root@ellen-m1 dirprm]# cat kafka2.props
                  gg.handlerlist =kafkahandler
                  gg.handler.kafkahandler.type=kafka
                  gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
                  gg.handler.kafkahandler.TopicName=oggtest
                  gg.handler.kafkahandler.format=json
                  gg.handler.kafkahandler.SchemaTopicName=fafaschema
                  gg.handler.kafkahandler.BlockingSend=true
                  gg.handler.kafkahandler.includeTokens=true
                  gg.handler.name.format.includePrimaryKeys=true
                  gg.handler.name.format.prettyPrintformat.prettyPrint=true
                  gg.handler.hdfs.format.jsonDelimiter=GAN
                  gg.handler.name.format.lineDelimiter=||
                  gg.handler.kafkahandler.mode=op
                  gg.handler.kafkahandler.maxGroupSize =100,1Mb
                  gg.handler.kafkahandler.minGroupSize =50,500Kb
                  gg.handler.name.format.pkUpdateHandling=delete-insert
                  gg.handler.name.format.includePosition=true
                  goldengate.userexit.timestamp=utc
                  goldengate.userexit.writers=javawriter
                  javawriter.stats.display=TRUE
                  javawriter.stats.full=TRUE
                  gg.log=log4j
                  gg.log.level=INFO
                  gg.report.time=30sec
                  gg.classpath=/opt/ogg/dirprm/:/opt/soft/kafka_2.12-0.10.2.0/libs/*:/opt/soft/kafka_2.12-0.10.2.0/config/
                  javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=/opt/ogg/ggjava/ggjava.jar

                    [10:40:28 root@ellen-m1 dirprm]# cat custom_kafka_producer.properties
                    bootstrap.servers=192.168.2.1:9092,192.168.2.2:9192,192.168.2.3:9092
                    acks=1
                    reconnect.backoff.ms=1000
                    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
                    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
                    100KB per partition
                    batch.size=102400
                    linger.ms=10000

                    • Kafka Handler Configuration 的属性 gg.handler.kafkahandler.ProducerRecordClass 里面提到了,默认使用的是oracle.goldengate.handler.kafka.DefaultProducerRecord这个类对表名进行分区的。

                    • 如果要自定义的话需要实现CreateProducerRecord这个接口

                    3.2.3、增量MQ消息

                      {
                      "table": "oggtest.oggtest",
                      "op_type": "U",
                      "op_ts": "2021-01-23 15:30:19.357882",
                      "current_ts": "2021-01-23T15:30:25.697000",
                      "pos": "00000000000000002266",
                      "primary_keys": ["ID"],
                      "before": {
                      "ID": 4,
                      "NAME": "test"
                      },
                      "after": {
                      "ID": 4,
                      "NAME": "agileq"
                      }
                      }

                      四、代码分析

                      4.1、步骤类

                      负责处理基础数据流数据,执行Mq To SQL转换

                        public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
                        meta = (MqToSqlMeta) smi;
                        data = (MqToSqlData) sdi;
                        Object[] r = getRow(); // get row, blocks when needed!
                        if (r == null) // no more input to be expected...
                        {
                        setOutputDone();
                        return false;
                        }
                        RowMetaInterface inputRowMeta = getInputRowMeta();
                        if (first) {
                        first = false;
                        data.outputRowMeta = getInputRowMeta().clone();
                        meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
                        int numErrors = 0;
                        String jsonStr = environmentSubstitute(meta.getJsonStr());
                        if (isEmpty(jsonStr)) {
                        logError(Messages.getString("MqToSqlStep.Log.JsonStrIsNull", jsonStr));
                        numErrors++;
                        }
                        data.jsonStrNr = inputRowMeta.indexOfValue(jsonStr);
                        if (data.jsonStrNr < 0) {
                        logError(Messages.getString("MqToSqlStep.Log.CouldntFindField", jsonStr));
                        numErrors++;
                        }
                        if (!inputRowMeta.getValueMeta(data.jsonStrNr).isBinary()
                        && !inputRowMeta.getValueMeta(data.jsonStrNr).isString()) {
                        logError(Messages.getString("MqToSqlStep.Log.FieldNotValid", jsonStr));
                        numErrors++;
                        }
                        data.jsonStrMeta = inputRowMeta.getValueMeta(data.jsonStrNr);
                        String tableName = environmentSubstitute(meta.getTableName());
                        if (!isEmpty(tableName)) {
                        logBasic(Messages.getString("MqToSqlStep.Log.TableName", tableName));
                        }
                        String operType = environmentSubstitute(meta.getOperType());
                        if (isEmpty(operType)) {
                        logError(Messages.getString("MqToSqlStep.Log.operTypeIsNull"));
                        numErrors++;
                        }
                        data.operTypeNr = inputRowMeta.indexOfValue(operType);
                        if (data.operTypeNr < 0) {
                        logError(Messages.getString("MqToSqlStep.Log.CouldntFindField", operType));
                        numErrors++;
                        }
                        if (!inputRowMeta.getValueMeta(data.operTypeNr).isBinary()
                        && !inputRowMeta.getValueMeta(data.operTypeNr).isString()) {
                        logError(Messages.getString("MqToSqlStep.Log.FieldNotValid", operType));
                        numErrors++;
                        }
                        data.operTypeMeta = inputRowMeta.getValueMeta(data.operTypeNr);
                        String jsonDefaultstr = environmentSubstitute(meta.getJsonDefaultStr());
                        if (isEmpty(jsonDefaultstr)) {
                        logError(Messages.getString("MqToSqlStep.Log.JsonDefaultstrIsNull"));
                        numErrors++;
                        }
                        data.jsonDefaultStrNr = inputRowMeta.indexOfValue(jsonDefaultstr);
                        if (data.jsonDefaultStrNr < 0) {
                        logError(Messages.getString("MqToSqlStep.Log.CouldntFindField", jsonDefaultstr));
                        numErrors++;
                        }
                        if (!inputRowMeta.getValueMeta(data.jsonDefaultStrNr).isBinary()
                        && !inputRowMeta.getValueMeta(data.jsonDefaultStrNr).isString()) {
                        logError(Messages.getString("MqToSqlStep.Log.FieldNotValid", jsonDefaultstr));
                        numErrors++;
                        }
                        data.jsonDefaultStrMeta = inputRowMeta.getValueMeta(data.jsonDefaultStrNr);
                        if (numErrors > 0) {
                        setErrors(numErrors);
                        stopAll();
                        return false;
                        }
                        logBasic("template step initialized successfully");
                        }
                        int jsonStrIndex = getInputRowMeta().indexOfValue(meta.getJsonStr());
                        int operTypeIndex = getInputRowMeta().indexOfValue(meta.getOperType());
                        int jsonDefaultStrIndex = getInputRowMeta().indexOfValue(meta.getJsonDefaultStr());
                        String sbSql = null;
                        try {
                        sbSql = JsonToSqlUtil.jsonToMapSql(r[jsonStrIndex].toString(),
                        data.jsonKeyStrObj, data.jsonValueStrObj,
                        r[operTypeIndex].toString(), meta.getTableName(),
                        r[jsonDefaultStrIndex].toString(), data.primaryKeys);
                        } catch (Exception e) {
                        logError("Failed to process row[" + r[jsonStrIndex].toString() + "].", e);
                        try {
                        throw e;
                        } catch (Exception e1) {
                        e1.printStackTrace();
                        }
                        }
                        Object[] outputRow = RowDataUtil.addValueData(r, data.outputRowMeta.size() - 1, sbSql);
                        putRow(data.outputRowMeta, outputRow);
                        incrementLinesOutput();
                        if (checkFeedback(getLinesRead())) {
                        logBasic("Linenr " + getLinesRead());
                        }
                        return true;
                        }

                        4.2、plugin配置

                          <?xml version="1.0" encoding="UTF-8"?>
                          <plugin
                          id="MqToSqlPlugin"
                          iconfile="icon.png"
                          description="Mq To Sql"
                          tooltip="Only there for MqToSql purposes"
                          category="游走在数据之间"
                          classname="com.agileq.kettle.mts.plugin.MqToSqlMeta">

                          <libraries>
                          <library name="pentaho-kettle-mqtosql-plugin.jar"/>
                          <library name="lib/fastjson-1.2.29.jar"/>
                          <library name="lib/fel.jar"/>
                          <library name="lib/commons-lang3-3.5.jar" />
                          </libraries>

                          </plugin>


                          • id:在Kettle插件中必须保证全局唯一

                          • iconfile:Spoon UI中的显示图标,尽量使用png格式图片

                          • description:描述该插件的具体作用

                          • tooltip:树形菜单中,鼠标滑过显示的提示信息

                          • category:插件归属目录

                          • classname:元数据类

                          • libraries:插件依赖Jar包列表

                          4.3、表达式解析

                          使用fel表达式解析引擎,来解析配置中需要执行的类型转换和数值聚合操作

                            public static Object jsonValueFormat(JSONObject jsonObject, String keyObj,
                            Object valueObj) {
                            Object jsonValue = valueObj;
                            String value = jsonObject.getString(keyObj);
                            if (value != null) {
                            if (valueObj instanceof String) {
                            jsonValue = fel.eval(value.replace(keyObj, "'"
                            + (String) valueObj + "'"));
                            } else {
                            jsonValue = valueObj != null ? fel.eval(value.replace(keyObj,
                            valueObj.toString())) : null;
                            }
                            }
                            return jsonValue;
                            }

                            4.4、核心代码

                            根据操作类型、输入字段列表、输出字段列表和增量消息Json等参数,通过核心转换逻辑执行消息的dml语句生成。

                                
                              /**
                              * json转换成Map,根据操作类型等参数输出要执行的sql语句
                              *
                              * @param jsonString
                              * @param jsonKeyString
                              * @param jsonValueString
                              * @param changeOper
                              * @param tableName
                              * @param prmKeyName
                              * @return
                              * @throws JSONException
                              */
                              public static String jsonToMapSql(String jsonString,
                              JSONObject jsonKeyObject, JSONObject jsonValueObject,
                              String changeOper, String tableName, String jsonDefString,
                              Map<String, String> pramKeys) throws JSONException {
                              JSONObject jsonObject = JSONObject.parseObject(jsonString);
                              Map<String, Object> resultMapSet = new HashMap<String, Object>();
                              Map<String, Object> resultMapWhere = new HashMap<String, Object>();
                              Map<String, Object> resultMapOnDuplicate = new HashMap<String, Object>();
                              String jsonToSql = null;
                              String jsonToSqlWhere = null;
                              String jsonToSqlOnDuplicate = null;
                              if (null != jsonObject) {
                              Iterator<String> iter = jsonObject.keySet().iterator();
                              while (iter.hasNext()) {
                              String key = iter.next();
                              Object value = jsonObject.get(key);
                              // 标准化value输出
                              if (null != jsonValueObject) {
                              value = JsonFormat.jsonValueFormat(jsonValueObject, key,
                              value);
                              }
                              // 标准化key输出
                              if (null != jsonKeyObject) {
                              key = JsonFormat.jsonKeyFormat(jsonKeyObject, key);
                              }
                              if (null != key && !"".equals(key)) {
                              if ("U".equals(changeOper) || "D".equals(changeOper)) {
                              if (isPrimaryKey(pramKeys, key)) {
                              putResultMap(resultMapWhere, key, value);
                              } else {
                              putResultMap(resultMapSet, key, value);
                              }
                              } else {
                              putResultMap(resultMapSet, key, value);
                              if (!isPrimaryKey(pramKeys, key)) {
                              putResultMap(resultMapOnDuplicate, key, value);
                              }
                              }
                              }
                              }
                              jsonToSql = resultMapSet.toString();
                              jsonToSqlOnDuplicate = resultMapOnDuplicate.toString();
                              jsonToSqlWhere = resultMapWhere.toString();
                              jsonToSql = jsonToSql.substring(1, jsonToSql.length() - 1);
                              jsonToSqlOnDuplicate = jsonToSqlOnDuplicate.substring(1,
                              jsonToSqlOnDuplicate.length() - 1);
                              jsonToSqlWhere = jsonToSqlWhere.substring(1,
                              jsonToSqlWhere.length() - 1).replaceAll(",", " and ");
                              }
                              if (null == jsonToSqlWhere || ("").equals(jsonToSqlWhere)) {
                              jsonToSqlWhere = "0=1";
                              }
                              if (("U").equals(changeOper)) {
                              StringBuilder builder = new StringBuilder("update ")
                              .append(tableName).append(" set ").append(jsonToSql);
                              if (jsonToSql != null && !jsonToSql.equals("")) {
                              builder.append(", ");
                              }
                              if (jsonDefString != null && !jsonDefString.equals("")) {
                              builder.append(jsonDefString);
                              }
                              jsonToSql = builder.append(" where ").append(jsonToSqlWhere)
                              .append(";").toString();
                              } else if (("I").equals(changeOper)) {
                              StringBuilder builder = new StringBuilder("insert into ")
                              .append(tableName).append(" set ").append(jsonToSql);
                              if (jsonDefString != null && jsonDefString != "") {
                              builder.append(", ").append(jsonDefString);
                              }
                              builder.append(" on duplicate key update ").append(
                              jsonToSqlOnDuplicate);
                              if (jsonDefString != null && jsonDefString != "") {
                              builder.append(", ").append(jsonDefString);
                              }
                              jsonToSql = builder.append(";").toString();
                              } else if (("D").equals(changeOper)) {
                              jsonToSql = new StringBuilder("delete from ").append(tableName)
                              .append(" where ").append(jsonToSqlWhere).append(";")
                              .toString();
                              }
                              return jsonToSql;
                              }

                              五、插件说明

                                ①JsonStr name:动态待解析json字段,从上一步骤数据流动态获取(必选)
                                ②JsonKeyStr name:所需Key List列表,此处可做字段名匹配映射(必选)
                                ③JsonValueStr name:所需Value List列表,此处可做字段名对应值类型转换和数值做函数运算操作(必选)
                                ④TableName name:表名(必选)
                                ⑤PrimaryKey name:主键字段,更新必备。分库分表分区键必须配置,多个字段以逗号分隔(必选)
                                ⑥OperType name:动态操作类型字段,从上一步骤数据流动态获取(必选)
                                ⑦JsonDefaultStr name:默认字段追加,必须更新时追加更新时间、更新人员等(可选)
                                ⑧OutputDML name:最终生成的可执行的DML语句(必选)

                                六、总结

                                如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编公众号"游走在数据之间",回复2查看源代码,回复3获取入门视频。加入Kettle交流群"话说Kettle"。https://github.com/Jacksn2014/Kettle-Plugin-Master/tree/master


                                文章转载自以数据之名,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                评论