暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kettle插件开发

以数据之名 2021-06-08
2020

一、开发背景

工欲善其事,必先利其器。如果我们把Kettle离线或准实时ETL的工具链,那就绕不开Kettle定制化插件开发的环节。比如:我们需要对某个组件流出的数据进行特殊函数处理(如加解密);又或者我现有版本的组件不能满足我们对源端数据捕获的需求;再或者现有版本的组件缺失对重复消费的需求。    

简而言之,就是业务流程的特殊性,kettle原有流程处理组件不能满足或者完全满足我们的数据处理需求,就需要我们定制开发流程处理组件,以满足数据的管理、数据的验证、数据的转换和某些特殊类型数据源的抽取。


二、基本框架

我们以上图splunk查询插件为例,来一步步阐述Kettle转换插件的工作原理,这四个类构成了基础的Kettle步骤/节点。当然,存在即合理,每一个类都扮演者不同的角色及其特定的作用。

SplunkInput:步骤类

继承了BaseStep父类,并实现了StepInterface接口,在转换运行时,他的实例即是数据实际处理的位置,每一个执行线程都表示一个此类的示例。

SplunkInputData:数据类

继承了BaseStepData父类,并实现了StepDataInterface接口,用来存储数据,当插件执行时,对于每一个执行线程都是唯一的。执行时里面存储的主要有自定义的元数据对象、数据库连接、缓存、文件句柄等其他对象信息。

SplunkInputDialog:对话框类

继承了BaseStepDialog父类,并实现了StepDialogInterface接口,该类主要实现组件步骤与ETL开发者交互配置的界面,ETL开发者按照设定好的输入和输出选项配置,来实现个性化ETL开发。

SplunkInputMeta:元数据类

继承了BaseStepMeta父类,并实现了StepMetaInterface接口。他的作用是保存和序列化特定步骤实例的配置,即跟踪SplunkInputDialog类的开发者配置。在本例子中,它负责保存开发者设置的步骤属性、splunk连接配置属性和输出字段的名称类型等属性信息。

插件展现配置

负责定义插件步骤在Kettle可视化UI工作台中的显示效果。设置插件的唯一ID、名称及描述,说明kettle插件要加载的元数据类,以及需要预先加载的依赖Jar包列表。当然它有以下两种实现方式,本例采用第一种。

一、通过@Step注解实现

    @Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")


    id:在Kettle插件中必须保证全局唯一

    name:自定义插件Spoon UI中的显示名称,无强制唯一性要求

    description:描述该插件的具体作用,或者简要使用说明

    image:Spoon UI中的显示图标,尽量使用svg格式图片,可到阿里Icon图库查找和下载所需的图标(https://www.iconfont.cn/)

    categoryDescription:插件归属目录

    ……

    二、通过plugin.xml实现

      <?xml version="1.0" encoding="UTF-8"?>
      <plugin
      id="KafkaConsumer"
      iconfile="logo.png"
      description="Apache Kafka Consumer"
      tooltip="This plug-in allows reading messages from a specific topic in a Kafka stream"
      category="Input"
      classname="com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerMeta">

      <libraries>
      <library name="pentaho-kafka-consumer.jar"/>
      ……
      <library name="lib/zookeeper-3.4.6.jar"/>
      </libraries>

      <localized_category>
      <category locale="en_US">Input</category>
      </localized_category>
      <localized_description>
      <description locale="en_US">Apache Kafka Consumer</description>
      </localized_description>
      <localized_tooltip>
      <tooltip locale="en_US">This plug-in allows reading messages from a specific topic in a Kafka stream</tooltip>
      </localized_tooltip>
      </plugin>

      id:在Kettle插件中必须保证全局唯一

      iconfile:Spoon UI中的显示图标,尽量使用png格式图片

      description:描述该插件的具体作用

      tooltip:树形菜单中,鼠标滑过显示的提示信息

      category:插件归属目录

      classname:元数据类

      libraries:插件依赖Jar包列表


      三、实现代码

      3.1、步骤类

        public class SplunkInput extends BaseStep implements StepInterface {
        private SplunkInputMeta meta;
        private SplunkInputData data;
        public SplunkInput(StepMeta stepMeta, StepDataInterface stepDataInterface,
        int copyNr, TransMeta transMeta, Trans trans) {
        super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
        }
        @Override
        public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        // Is the step getting input?
        // List<StepMeta> steps = getTransMeta().findPreviousSteps(
        // getStepMeta() );
        // Connect to Neo4j
        if (StringUtils.isEmpty(meta.getHost())) {
        log.logError("You need to specify a Splunk connection Host to use in this step");
        return false;
        }
        if (StringUtils.isEmpty(meta.getPort())) {
        log.logError("You need to specify a Splunk connection Port to use in this step");
        return false;
        }
        if (StringUtils.isEmpty(meta.getUsername())) {
        log.logError("You need to specify a Splunk connection Username to use in this step");
        return false;
        }
        if (StringUtils.isEmpty(meta.getPassword())) {
        log.logError("You need to specify a Splunk connection Password to use in this step");
        return false;
        }
        // To correct lazy programmers who built certain PDI steps...
        //System.setProperty("https.protocols", "TLSv1.2,TLSv1.1,SSLv3");

        //Security.setProperty("jdk.tls.disabledAlgorithms","SSLv3, RC4, MD5withRSA, DH keySize < 768");
        /* Overriding the static method setSslSecurityProtocol to implement the security protocol of choice */
        // HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
        /* end comment for overriding the method setSslSecurityProtocol */

        data.splunkConnection = new SplunkConnection(meta.getName(),
        meta.getHost(), meta.getPort(), meta.getUsername(),
        meta.getPassword());
        data.splunkConnection.initializeVariablesFrom(this);
        try {
        data.serviceArgs = data.splunkConnection.getServiceArgs();

        data.service = Service.connect(data.serviceArgs);
        } catch (Exception e) {
        log.logError(
        "Unable to get or create Neo4j database driver for database '"
        + data.splunkConnection.getName() + "'", e);
        return false;
        }
        return super.init(smi, sdi);
        }
        @Override
        public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        super.dispose(smi, sdi);
        }
        @Override
        public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
        throws KettleException {
        meta = (SplunkInputMeta) smi;
        data = (SplunkInputData) sdi;
        if (first) {
        first = false;
        // get the output fields...
        data.outputRowMeta = new RowMeta();
        meta.getFields(data.outputRowMeta, getStepname(), null,
        getStepMeta(), this, repository, data.metaStore);
        // Run a one shot search in blocking mode
        JobResultsArgs args = new JobResultsArgs();
        //不限制返回数据量,即返回所有数据
        args.setCount(0);
        args.setOutputMode(JobResultsArgs.OutputMode.XML);
        data.eventsStream = data.service.oneshotSearch(getTransMeta()
        .environmentSubstitute(meta.getQuery()), args);
        }
        try {
        ResultsReaderXml resultsReader = new ResultsReaderXml(
        data.eventsStream);
        HashMap<String, String> event;
        while ((event = resultsReader.getNextEvent()) != null) {
        Object[] outputRow = RowDataUtil
        .allocateRowData(data.outputRowMeta.size());
        for (int i = 0; i < meta.getReturnValues().size(); i++) {
        ReturnValue returnValue = meta.getReturnValues().get(i);
        String value = event.get(returnValue.getSplunkName());
        outputRow[i] = value;
        }
        incrementLinesInput();
        putRow(data.outputRowMeta, outputRow);
        }
        } catch (Exception e) {
        throw new KettleException(
        "Error reading from Splunk events stream", e);
        } finally {
        try {
        data.eventsStream.close();
        } catch (IOException e) {
        throw new KettleException("Unable to close events stream", e);
        }
        }
        setOutputDone();
        return false;
        }
        }

        init(),该方法是在转换执行前被调用,只有所有步骤的初始化成功时,转换才会真正被执行。此例是检查splunk连接ip、port等属性是否配置,以及splunk连接初始化是否成功

        dispose(),该方法作用是在转换步骤执行完后执行,完成如缓存、文件句柄等资源的关闭操作。

        run(),该方法是在实际处理数据流记录集时调用。

        processRow(),该方法作用是处理所有数据流。通常通过调用getRow()来获取需要处理的单条记录。这个方法如果有需要将会被阻塞,例如当此步骤希望放慢脚步处理数据时。processRow()随后的流程将执行转换工作并调用putRow()方法将处理过的记录放到它的下游步骤。 



        3.2、数据类

                 开发中,大多数环节都需要临时的缓冲或者临时的数据。数据类就是这些数据合适的存放位置,且每一个执行线程将获取到其拥有的数据类实例。

          public class SplunkInputData extends BaseStepData implements StepDataInterface {


          public RowMetaInterface outputRowMeta;
          public SplunkConnection splunkConnection;//splunk连接
          public int[] fieldIndexes;//字段索引数组
          public String query;//splunk spl语句(必须以search开头)
          public IMetaStore metaStore;//元数据仓库对象


          public ServiceArgs serviceArgs;//splunk查询服务参数
          public Service service;//splunk查询服务
          public InputStream eventsStream;//输入事件流
          }


          3.3、对话框类

          此处主要实现输入属性监听器配置,配置数据初始化约束检查,splunk连接校验以及返回结果集字段属性预览等功能模块

            private static Class<?> PKG = SplunkInputMeta.class; // for i18n purposes,
            // needed by
            // Translator2!!
            private Text wStepname;
            private Text wHost;//splunk域名或IP
            private Text wPort;//splunk端口
            private Text wUsername;//splunk用户名
            private Text wPassword;//splunk密码
            private Text wQuery;//splunk spl语句
            private TableView wReturns;//splunk组件输出字段及属性
            private SplunkInputMeta input;

            ……

            3.4、元数据类

              下面针对splunk查询组件,列举了元数据类的几个关键的方法,其中私有成员变量outputField 存放了下一个步骤的输出流字段。

              @Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
              @InjectionSupported(localizationPrefix = "Cypher.Injection.", groups = {
              "PARAMETERS", "RETURNS" })
              public class SplunkInputMeta extends BaseStepMeta implements StepMetaInterface {
              public static final String HOST = "host";
              public static final String PORT = "port";
              public static final String USERNAME = "username";
              public static final String PASSWORD = "password";
              public static final String QUERY = "query";
              public static final String RETURNS = "returns";
              public static final String RETURN = "return";
              public static final String RETURN_NAME = "return_name";
              public static final String RETURN_SPLUNK_NAME = "return_splunk_name";
              public static final String RETURN_TYPE = "return_type";
              public static final String RETURN_LENGTH = "return_length";
              public static final String RETURN_FORMAT = "return_format";
              @Injection(name = HOST)
              private String host;
              @Injection(name = PORT)
              private String port;
              @Injection(name = USERNAME)
              private String username;
              @Injection(name = PASSWORD)
              private String password;
              @Injection(name = QUERY)
              private String query;
              @InjectionDeep
              private List<ReturnValue> returnValues;
              public SplunkInputMeta() {
              super();
              returnValues = new ArrayList<>();
              }
              ……
              }

              // 跟踪组件步骤输入和输出设置  

              public String getOutputField()  

              public void setOutputField(…)  

              public void setDefault()  //配置参数初始化

                @Override
                public void setDefault() {
                host = "127.0.0.1";
                port = "8089";
                username = "query";
                password = "query";
                query = "search * | head 100";
                }

                // 依赖获取和载入xml,序列化步骤属性设置

                public String getXML()  

                  @Override
                  public String getXML() {
                  StringBuilder xml = new StringBuilder();
                  xml.append(XMLHandler.addTagValue(HOST, host));
                  xml.append(XMLHandler.addTagValue(PORT, port));
                  xml.append(XMLHandler.addTagValue(USERNAME, username));
                  xml.append(XMLHandler.addTagValue(PASSWORD, password));
                  xml.append(XMLHandler.addTagValue(QUERY, query));
                  xml.append(XMLHandler.openTag(RETURNS));
                  for (ReturnValue returnValue : returnValues) {
                  xml.append(XMLHandler.openTag(RETURN));
                  xml.append(XMLHandler.addTagValue(RETURN_NAME,
                  returnValue.getName()));
                  xml.append(XMLHandler.addTagValue(RETURN_SPLUNK_NAME,
                  returnValue.getSplunkName()));
                  xml.append(XMLHandler.addTagValue(RETURN_TYPE,
                  returnValue.getType()));
                  xml.append(XMLHandler.addTagValue(RETURN_LENGTH,
                  returnValue.getLength()));
                  xml.append(XMLHandler.addTagValue(RETURN_FORMAT,
                  returnValue.getFormat()));
                  xml.append(XMLHandler.closeTag(RETURN));
                  }
                  xml.append(XMLHandler.closeTag(RETURNS));
                  return xml.toString();
                  }

                  public void loadXML(…)  

                  // 从资源库读取和保存步骤属性设置

                  public void readRep(…)  

                    @Override
                    public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId,
                    List<DatabaseMeta> databases) throws KettleException {
                    host = rep.getStepAttributeString(stepId, HOST);
                    port = rep.getStepAttributeString(stepId, PORT);
                    username = rep.getStepAttributeString(stepId, USERNAME);
                    password = rep.getStepAttributeString(stepId, PASSWORD);
                    query = rep.getStepAttributeString(stepId, QUERY);
                    returnValues = new ArrayList<>();
                    int nrReturns = rep.countNrStepAttributes(stepId, RETURN_NAME);
                    for (int i = 0; i < nrReturns; i++) {
                    String name = rep.getStepAttributeString(stepId, i, RETURN_NAME);
                    String splunkName = rep.getStepAttributeString(stepId, i,
                    RETURN_SPLUNK_NAME);
                    String type = rep.getStepAttributeString(stepId, i, RETURN_TYPE);
                    int length = (int) rep.getStepAttributeInteger(stepId, i,
                    RETURN_LENGTH);
                    String format = rep
                    .getStepAttributeString(stepId, i, RETURN_FORMAT);
                    returnValues.add(new ReturnValue(name, splunkName, type, length,
                    format));
                    }
                    }

                    public void saveRep(…)  

                    // 提供有关步骤如何处理数据流行的字段结构的信息

                    public void getFields(…)  

                      @Override
                      public void getFields(RowMetaInterface rowMeta, String name,
                      RowMetaInterface[] info, StepMeta nextStep, VariableSpace space,
                      Repository repository, IMetaStore metaStore)
                      throws KettleStepException {
                      for (ReturnValue returnValue : returnValues) {
                      try {
                      int type = ValueMetaFactory.getIdForValueMeta(returnValue
                      .getType());
                      ValueMetaInterface valueMeta = ValueMetaFactory
                      .createValueMeta(returnValue.getName(), type);
                      valueMeta.setLength(returnValue.getLength());
                      valueMeta.setOrigin(name);
                      rowMeta.addValueMeta(valueMeta);
                      } catch (KettlePluginException e) {
                      throw new KettleStepException("Unknown data type '"
                      + returnValue.getType() + "' for value named '"
                      + returnValue.getName() + "'");
                      }
                      }
                      }

                      // 对步骤执行扩展验证检查

                      public void check(…)  

                      // 向Kettle提供步骤、数据和对话框类的实例 

                      public StepInterface getStep(…)  

                        @Override
                        public StepInterface getStep(StepMeta stepMeta,
                        StepDataInterface stepDataInterface, int i, TransMeta transMeta,
                        Trans trans) {
                        return new SplunkInput(stepMeta, stepDataInterface, i, transMeta, trans);
                        }

                        public StepDataInterface getStepData()  

                          @Override
                          public StepDataInterface getStepData() {
                          return new SplunkInputData();
                          }

                          3.5、附加类

                          该插件还需要SplunkConnection类来构建splunk连接,ReturnValue类来标准化splunk输出流字段信息。

                            public ServiceArgs getServiceArgs() {
                            ServiceArgs args = new ServiceArgs();
                            args.setUsername(getRealUsername());
                            args.setPassword(getRealPassword());
                            args.setHost(getRealHostname());
                            args.setPort(Const.toInt(getRealPort(), 8089));
                            args.setSSLSecurityProtocol(SSLSecurityProtocol.TLSv1_2);//具体支持协议依赖于服务端配置
                            return args;
                            }
                              public class ReturnValue {
                              @Injection(name = "RETURN_NAME", group = "RETURNS")
                              private String name;
                              @Injection(name = "RETURN_SPLUNK_NAME", group = "RETURNS")
                              private String splunkName;
                              @Injection(name = "RETURN_TYPE", group = "RETURNS")
                              private String type;
                              @Injection(name = "RETURN_LENGTH", group = "RETURNS")
                              private int length;
                              @Injection(name = "RETURN_FORMAT", group = "RETURNS")
                              private String format;
                              public ReturnValue(String name, String splunkName, String type, int length,
                              String format) {
                              this.name = name;
                              this.splunkName = splunkName;
                              this.type = type;
                              this.length = length;
                              this.format = format;
                              }
                              @Override
                              public boolean equals(Object o) {
                              if (this == o) {
                              return true;
                              }
                              if (o == null || getClass() != o.getClass()) {
                              return false;
                              }
                              ReturnValue that = (ReturnValue) o;
                              return Objects.equals(name, that.name);
                              }
                              @Override
                              public int hashCode() {
                              return Objects.hash(name);
                              }
                              @Override
                              public String toString() {
                              return "ReturnValue{" + "name='" + name + '\'' + '}';
                              }
                              /**
                              * Gets name
                              *
                              * @return value of name
                              */
                              public String getName() {
                              return name;
                              }
                              /**
                              * @param name
                              * The name to set
                              */
                              public void setName(String name) {
                              this.name = name;
                              }
                              /**
                              * Gets splunkName
                              *
                              * @return value of splunkName
                              */
                              public String getSplunkName() {
                              return splunkName;
                              }
                              /**
                              * @param splunkName
                              * The splunkName to set
                              */
                              public void setSplunkName(String splunkName) {
                              this.splunkName = splunkName;
                              }
                              /**
                              * Gets type
                              *
                              * @return value of type
                              */
                              public String getType() {
                              return type;
                              }
                              /**
                              * @param type
                              * The type to set
                              */
                              public void setType(String type) {
                              this.type = type;
                              }
                              /**
                              * Gets length
                              *
                              * @return value of length
                              */
                              public int getLength() {
                              return length;
                              }
                              /**
                              * @param length
                              * The length to set
                              */
                              public void setLength(int length) {
                              this.length = length;
                              }
                              /**
                              * Gets format
                              *
                              * @return value of format
                              */
                              public String getFormat() {
                              return format;
                              }
                              /**
                              * @param format
                              * The format to set
                              */
                              public void setFormat(String format) {
                              this.format = format;
                              }
                              }

                              四、总结

                              通过以上对于Kettle自定义插件开发的阐述,我们基本上可以掌握端到端的开发流程,如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编公众号"游走在数据之间",加入QQKettle交流群976329836。


                              文章转载自以数据之名,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论