一、开发背景
工欲善其事,必先利其器。如果我们把Kettle离线或准实时ETL的工具链,那就绕不开Kettle定制化插件开发的环节。比如:我们需要对某个组件流出的数据进行特殊函数处理(如加解密);又或者我现有版本的组件不能满足我们对源端数据捕获的需求;再或者现有版本的组件缺失对重复消费的需求。
简而言之,就是业务流程的特殊性,kettle原有流程处理组件不能满足或者完全满足我们的数据处理需求,就需要我们定制开发流程处理组件,以满足数据的管理、数据的验证、数据的转换和某些特殊类型数据源的抽取。
二、基本框架

我们以上图splunk查询插件为例,来一步步阐述Kettle转换插件的工作原理,这四个类构成了基础的Kettle步骤/节点。当然,存在即合理,每一个类都扮演者不同的角色及其特定的作用。
SplunkInput:步骤类
继承了BaseStep父类,并实现了StepInterface接口,在转换运行时,他的实例即是数据实际处理的位置,每一个执行线程都表示一个此类的示例。

SplunkInputData:数据类
继承了BaseStepData父类,并实现了StepDataInterface接口,用来存储数据,当插件执行时,对于每一个执行线程都是唯一的。执行时里面存储的主要有自定义的元数据对象、数据库连接、缓存、文件句柄等其他对象信息。

SplunkInputDialog:对话框类
继承了BaseStepDialog父类,并实现了StepDialogInterface接口,该类主要实现组件步骤与ETL开发者交互配置的界面,ETL开发者按照设定好的输入和输出选项配置,来实现个性化ETL开发。

SplunkInputMeta:元数据类
继承了BaseStepMeta父类,并实现了StepMetaInterface接口。他的作用是保存和序列化特定步骤实例的配置,即跟踪SplunkInputDialog类的开发者配置。在本例子中,它负责保存开发者设置的步骤属性、splunk连接配置属性和输出字段的名称类型等属性信息。
插件展现配置
负责定义插件步骤在Kettle可视化UI工作台中的显示效果。设置插件的唯一ID、名称及描述,说明kettle插件要加载的元数据类,以及需要预先加载的依赖Jar包列表。当然它有以下两种实现方式,本例采用第一种。
一、通过@Step注解实现
@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")
id:在Kettle插件中必须保证全局唯一
name:自定义插件Spoon UI中的显示名称,无强制唯一性要求
description:描述该插件的具体作用,或者简要使用说明
image:Spoon UI中的显示图标,尽量使用svg格式图片,可到阿里Icon图库查找和下载所需的图标(https://www.iconfont.cn/)
categoryDescription:插件归属目录
……
二、通过plugin.xml实现
<?xml version="1.0" encoding="UTF-8"?><pluginid="KafkaConsumer"iconfile="logo.png"description="Apache Kafka Consumer"tooltip="This plug-in allows reading messages from a specific topic in a Kafka stream"category="Input"classname="com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerMeta"><libraries><library name="pentaho-kafka-consumer.jar"/>……<library name="lib/zookeeper-3.4.6.jar"/></libraries><localized_category><category locale="en_US">Input</category></localized_category><localized_description><description locale="en_US">Apache Kafka Consumer</description></localized_description><localized_tooltip><tooltip locale="en_US">This plug-in allows reading messages from a specific topic in a Kafka stream</tooltip></localized_tooltip></plugin>
id:在Kettle插件中必须保证全局唯一
iconfile:Spoon UI中的显示图标,尽量使用png格式图片
description:描述该插件的具体作用
tooltip:树形菜单中,鼠标滑过显示的提示信息
category:插件归属目录
classname:元数据类
libraries:插件依赖Jar包列表
三、实现代码
3.1、步骤类
public class SplunkInput extends BaseStep implements StepInterface {private SplunkInputMeta meta;private SplunkInputData data;public SplunkInput(StepMeta stepMeta, StepDataInterface stepDataInterface,int copyNr, TransMeta transMeta, Trans trans) {super(stepMeta, stepDataInterface, copyNr, transMeta, trans);}@Overridepublic boolean init(StepMetaInterface smi, StepDataInterface sdi) {meta = (SplunkInputMeta) smi;data = (SplunkInputData) sdi;// Is the step getting input?// List<StepMeta> steps = getTransMeta().findPreviousSteps(// getStepMeta() );// Connect to Neo4jif (StringUtils.isEmpty(meta.getHost())) {log.logError("You need to specify a Splunk connection Host to use in this step");return false;}if (StringUtils.isEmpty(meta.getPort())) {log.logError("You need to specify a Splunk connection Port to use in this step");return false;}if (StringUtils.isEmpty(meta.getUsername())) {log.logError("You need to specify a Splunk connection Username to use in this step");return false;}if (StringUtils.isEmpty(meta.getPassword())) {log.logError("You need to specify a Splunk connection Password to use in this step");return false;}// To correct lazy programmers who built certain PDI steps...//System.setProperty("https.protocols", "TLSv1.2,TLSv1.1,SSLv3");//Security.setProperty("jdk.tls.disabledAlgorithms","SSLv3, RC4, MD5withRSA, DH keySize < 768");/* Overriding the static method setSslSecurityProtocol to implement the security protocol of choice */// HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);/* end comment for overriding the method setSslSecurityProtocol */data.splunkConnection = new SplunkConnection(meta.getName(),meta.getHost(), meta.getPort(), meta.getUsername(),meta.getPassword());data.splunkConnection.initializeVariablesFrom(this);try {data.serviceArgs = data.splunkConnection.getServiceArgs();data.service = Service.connect(data.serviceArgs);} catch (Exception e) {log.logError("Unable to get or create Neo4j database driver for database '"+ data.splunkConnection.getName() + "'", e);return false;}return super.init(smi, sdi);}@Overridepublic void dispose(StepMetaInterface smi, StepDataInterface sdi) {meta = (SplunkInputMeta) smi;data = (SplunkInputData) sdi;super.dispose(smi, sdi);}@Overridepublic boolean processRow(StepMetaInterface smi, StepDataInterface sdi)throws KettleException {meta = (SplunkInputMeta) smi;data = (SplunkInputData) sdi;if (first) {first = false;// get the output fields...data.outputRowMeta = new RowMeta();meta.getFields(data.outputRowMeta, getStepname(), null,getStepMeta(), this, repository, data.metaStore);// Run a one shot search in blocking modeJobResultsArgs args = new JobResultsArgs();//不限制返回数据量,即返回所有数据args.setCount(0);args.setOutputMode(JobResultsArgs.OutputMode.XML);data.eventsStream = data.service.oneshotSearch(getTransMeta().environmentSubstitute(meta.getQuery()), args);}try {ResultsReaderXml resultsReader = new ResultsReaderXml(data.eventsStream);HashMap<String, String> event;while ((event = resultsReader.getNextEvent()) != null) {Object[] outputRow = RowDataUtil.allocateRowData(data.outputRowMeta.size());for (int i = 0; i < meta.getReturnValues().size(); i++) {ReturnValue returnValue = meta.getReturnValues().get(i);String value = event.get(returnValue.getSplunkName());outputRow[i] = value;}incrementLinesInput();putRow(data.outputRowMeta, outputRow);}} catch (Exception e) {throw new KettleException("Error reading from Splunk events stream", e);} finally {try {data.eventsStream.close();} catch (IOException e) {throw new KettleException("Unable to close events stream", e);}}setOutputDone();return false;}}
init(),该方法是在转换执行前被调用,只有所有步骤的初始化成功时,转换才会真正被执行。此例是检查splunk连接ip、port等属性是否配置,以及splunk连接初始化是否成功
dispose(),该方法作用是在转换步骤执行完后执行,完成如缓存、文件句柄等资源的关闭操作。
run(),该方法是在实际处理数据流记录集时调用。
processRow(),该方法作用是处理所有数据流。通常通过调用getRow()来获取需要处理的单条记录。这个方法如果有需要将会被阻塞,例如当此步骤希望放慢脚步处理数据时。processRow()随后的流程将执行转换工作并调用putRow()方法将处理过的记录放到它的下游步骤。
3.2、数据类
开发中,大多数环节都需要临时的缓冲或者临时的数据。数据类就是这些数据合适的存放位置,且每一个执行线程将获取到其拥有的数据类实例。
public class SplunkInputData extends BaseStepData implements StepDataInterface {public RowMetaInterface outputRowMeta;public SplunkConnection splunkConnection;//splunk连接public int[] fieldIndexes;//字段索引数组public String query;//splunk spl语句(必须以search开头)public IMetaStore metaStore;//元数据仓库对象public ServiceArgs serviceArgs;//splunk查询服务参数public Service service;//splunk查询服务public InputStream eventsStream;//输入事件流}
3.3、对话框类
此处主要实现输入属性监听器配置,配置数据初始化约束检查,splunk连接校验以及返回结果集字段属性预览等功能模块
private static Class<?> PKG = SplunkInputMeta.class; // for i18n purposes,// needed by// Translator2!!private Text wStepname;private Text wHost;//splunk域名或IPprivate Text wPort;//splunk端口private Text wUsername;//splunk用户名private Text wPassword;//splunk密码private Text wQuery;//splunk spl语句private TableView wReturns;//splunk组件输出字段及属性private SplunkInputMeta input;……
3.4、元数据类
下面针对splunk查询组件,列举了元数据类的几个关键的方法,其中私有成员变量outputField 存放了下一个步骤的输出流字段。
@Step(id = "KettleSplunkInput", name = "Splunk Input", description = "Read data from Splunk", image = "splunk.svg", categoryDescription = "Input")@InjectionSupported(localizationPrefix = "Cypher.Injection.", groups = {"PARAMETERS", "RETURNS" })public class SplunkInputMeta extends BaseStepMeta implements StepMetaInterface {public static final String HOST = "host";public static final String PORT = "port";public static final String USERNAME = "username";public static final String PASSWORD = "password";public static final String QUERY = "query";public static final String RETURNS = "returns";public static final String RETURN = "return";public static final String RETURN_NAME = "return_name";public static final String RETURN_SPLUNK_NAME = "return_splunk_name";public static final String RETURN_TYPE = "return_type";public static final String RETURN_LENGTH = "return_length";public static final String RETURN_FORMAT = "return_format";@Injection(name = HOST)private String host;@Injection(name = PORT)private String port;@Injection(name = USERNAME)private String username;@Injection(name = PASSWORD)private String password;@Injection(name = QUERY)private String query;@InjectionDeepprivate List<ReturnValue> returnValues;public SplunkInputMeta() {super();returnValues = new ArrayList<>();}……}
// 跟踪组件步骤输入和输出设置
public String getOutputField()
public void setOutputField(…)
public void setDefault() //配置参数初始化
@Overridepublic void setDefault() {host = "127.0.0.1";port = "8089";username = "query";password = "query";query = "search * | head 100";}
// 依赖获取和载入xml,序列化步骤属性设置
public String getXML()
@Overridepublic String getXML() {StringBuilder xml = new StringBuilder();xml.append(XMLHandler.addTagValue(HOST, host));xml.append(XMLHandler.addTagValue(PORT, port));xml.append(XMLHandler.addTagValue(USERNAME, username));xml.append(XMLHandler.addTagValue(PASSWORD, password));xml.append(XMLHandler.addTagValue(QUERY, query));xml.append(XMLHandler.openTag(RETURNS));for (ReturnValue returnValue : returnValues) {xml.append(XMLHandler.openTag(RETURN));xml.append(XMLHandler.addTagValue(RETURN_NAME,returnValue.getName()));xml.append(XMLHandler.addTagValue(RETURN_SPLUNK_NAME,returnValue.getSplunkName()));xml.append(XMLHandler.addTagValue(RETURN_TYPE,returnValue.getType()));xml.append(XMLHandler.addTagValue(RETURN_LENGTH,returnValue.getLength()));xml.append(XMLHandler.addTagValue(RETURN_FORMAT,returnValue.getFormat()));xml.append(XMLHandler.closeTag(RETURN));}xml.append(XMLHandler.closeTag(RETURNS));return xml.toString();}
public void loadXML(…)
// 从资源库读取和保存步骤属性设置
public void readRep(…)
@Overridepublic void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId,List<DatabaseMeta> databases) throws KettleException {host = rep.getStepAttributeString(stepId, HOST);port = rep.getStepAttributeString(stepId, PORT);username = rep.getStepAttributeString(stepId, USERNAME);password = rep.getStepAttributeString(stepId, PASSWORD);query = rep.getStepAttributeString(stepId, QUERY);returnValues = new ArrayList<>();int nrReturns = rep.countNrStepAttributes(stepId, RETURN_NAME);for (int i = 0; i < nrReturns; i++) {String name = rep.getStepAttributeString(stepId, i, RETURN_NAME);String splunkName = rep.getStepAttributeString(stepId, i,RETURN_SPLUNK_NAME);String type = rep.getStepAttributeString(stepId, i, RETURN_TYPE);int length = (int) rep.getStepAttributeInteger(stepId, i,RETURN_LENGTH);String format = rep.getStepAttributeString(stepId, i, RETURN_FORMAT);returnValues.add(new ReturnValue(name, splunkName, type, length,format));}}
public void saveRep(…)
// 提供有关步骤如何处理数据流行的字段结构的信息
public void getFields(…)
@Overridepublic void getFields(RowMetaInterface rowMeta, String name,RowMetaInterface[] info, StepMeta nextStep, VariableSpace space,Repository repository, IMetaStore metaStore)throws KettleStepException {for (ReturnValue returnValue : returnValues) {try {int type = ValueMetaFactory.getIdForValueMeta(returnValue.getType());ValueMetaInterface valueMeta = ValueMetaFactory.createValueMeta(returnValue.getName(), type);valueMeta.setLength(returnValue.getLength());valueMeta.setOrigin(name);rowMeta.addValueMeta(valueMeta);} catch (KettlePluginException e) {throw new KettleStepException("Unknown data type '"+ returnValue.getType() + "' for value named '"+ returnValue.getName() + "'");}}}
// 对步骤执行扩展验证检查
public void check(…)
// 向Kettle提供步骤、数据和对话框类的实例
public StepInterface getStep(…)
@Overridepublic StepInterface getStep(StepMeta stepMeta,StepDataInterface stepDataInterface, int i, TransMeta transMeta,Trans trans) {return new SplunkInput(stepMeta, stepDataInterface, i, transMeta, trans);}
public StepDataInterface getStepData()
@Overridepublic StepDataInterface getStepData() {return new SplunkInputData();}
3.5、附加类
该插件还需要SplunkConnection类来构建splunk连接,ReturnValue类来标准化splunk输出流字段信息。
public ServiceArgs getServiceArgs() {ServiceArgs args = new ServiceArgs();args.setUsername(getRealUsername());args.setPassword(getRealPassword());args.setHost(getRealHostname());args.setPort(Const.toInt(getRealPort(), 8089));args.setSSLSecurityProtocol(SSLSecurityProtocol.TLSv1_2);//具体支持协议依赖于服务端配置return args;}
public class ReturnValue {@Injection(name = "RETURN_NAME", group = "RETURNS")private String name;@Injection(name = "RETURN_SPLUNK_NAME", group = "RETURNS")private String splunkName;@Injection(name = "RETURN_TYPE", group = "RETURNS")private String type;@Injection(name = "RETURN_LENGTH", group = "RETURNS")private int length;@Injection(name = "RETURN_FORMAT", group = "RETURNS")private String format;public ReturnValue(String name, String splunkName, String type, int length,String format) {this.name = name;this.splunkName = splunkName;this.type = type;this.length = length;this.format = format;}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}ReturnValue that = (ReturnValue) o;return Objects.equals(name, that.name);}@Overridepublic int hashCode() {return Objects.hash(name);}@Overridepublic String toString() {return "ReturnValue{" + "name='" + name + '\'' + '}';}/*** Gets name** @return value of name*/public String getName() {return name;}/*** @param name* The name to set*/public void setName(String name) {this.name = name;}/*** Gets splunkName** @return value of splunkName*/public String getSplunkName() {return splunkName;}/*** @param splunkName* The splunkName to set*/public void setSplunkName(String splunkName) {this.splunkName = splunkName;}/*** Gets type** @return value of type*/public String getType() {return type;}/*** @param type* The type to set*/public void setType(String type) {this.type = type;}/*** Gets length** @return value of length*/public int getLength() {return length;}/*** @param length* The length to set*/public void setLength(int length) {this.length = length;}/*** Gets format** @return value of format*/public String getFormat() {return format;}/*** @param format* The format to set*/public void setFormat(String format) {this.format = format;}}
四、总结
通过以上对于Kettle自定义插件开发的阐述,我们基本上可以掌握端到端的开发流程,如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编公众号"游走在数据之间",加入QQKettle交流群976329836。





