

| 属性 | 描述 |
| Script Engine | 可选脚本语言 groovy,clojure,ecmascript,lua,python,ruby |
| Script File | 脚本文件路径 |
| Script Body | 脚本 |
| Module Directory | 脚本用到的包;例如用到的fastjosn包,包fastjson的jar包放到这个路径就可以了 |
备注:Script File和Script Body不能全填,也不能全不填,Script Body不支持动态获取,Script File支持从变量中获取,均不支持从flowfile里边获取。
下面我们开始学习如果写一个ExecuteScrip能执行的groovy脚本
| 直接可用变量 | 用处 |
| session | create()、putAttribute()、transfer()、read()、write(). |
| context | 用于检索处理器属性、关系、控制器服务和StateManager; StateManager用法 getState,setState,replace,clear |
| log | log.info('Hello world!') log.info('{} {} {}', ['Hello',1,true] as Object[]) |
| REL_SUCCESS | 路由到success连接 |
| REL_FAILURE | 路由到failure连接 |
| Dynamic Properties | 静态的;动态的带EL表达式的;control service的唯一标识; |
从上游队列获取flowfile
//单个获取flowFile = session.get()if(!flowFile) return
// 获取多个flowFileList = session.get(100)if(!flowFileList.isEmpty()) {flowFileList.each { flowFile ->// Process each FlowFile here}}
从flowfile中读数据
import org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsflowFile = session.get()if(!flowFile)returndef text = ''// Cast a closure with an inputStream parameter to InputStreamCallbacksession.read(flowFile, {inputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)// Do something with text here} as InputStreamCallback)
创建新的flowfile
flowFile = session.create()// Additional processing here
flowFile = session.get()if(!flowFile) returnnewFlowFile = session.create(flowFile)// Additional processing here
获取属性
flowFile = session.get()if(!flowFile) returnmyAttr = flowFile.getAttribute('filename')
// 遍历所有属性flowFile = session.get()if(!flowFile) returnflowFile.getAttributes().each { key,value ->// Do something with the key/value pair}
写入flowfile属性
flowFile = session.get()if(!flowFile) returnflowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]flowFile = session.get()if(!flowFile) returnflowFile = session.putAllAttributes(flowFile, attrMap)
写入flowfile数据
import org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsflowFile = session.get()if(!flowFile) returndef text = 'Hello world!'// Cast a closure with an outputStream parameter to OutputStreamCallbackflowFile = session.write(flowFile, {outputStream ->outputStream.write(text.getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)
给下游分发flowfile
flowFile = session.get()if(!flowFile) return// Processing occurs hereif(errorOccurred) {session.transfer(flowFile, REL_FAILURE)}else {session.transfer(flowFile, REL_SUCCESS)}
打日志
log.info('Hello world!')log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])
Reading and writing to/from a flow file
import org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsflowFile = session.get()if(!flowFile) returndef text = 'Hello world!'// Cast a closure with an inputStream and outputStream parameter to StreamCallbackflowFile = session.write(flowFile, {inputStream, outputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))} as StreamCallback)session.transfer(flowFile, REL_SUCCESS)
异常捕获
flowFile = session.get()if(!flowFile) returntry {// Something that might throw an exception here// Last operation is transfer to success (failures handled in the catch block)session.transfer(flowFile, REL_SUCCESS)} catch(e) {log.error('Something went wrong', e)session.transfer(flowFile, REL_FAILURE)}
使用动态添加的属性

// 获取动态添加的静态变量def myValue1 = myProperty1.value
// 获取动态添加的el表达式动态变量def myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).value
使用状态管理
import org.apache.nifi.components.state.Scope// 读def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
import org.apache.nifi.components.state.Scopedef stateManager = context.stateManagerdef stateMap = stateManager.getState(Scope.CLUSTER)def newMap = ['myKey1': 'myValue1']if (stateMap.version == -1) {// 覆盖stateManager.setState(newMap, Scope.CLUSTER);} else {// mergerstateManager.replace(stateMap, newMap, Scope.CLUSTER);}
// 清理import org.apache.nifi.components.state.Scopecontext.stateManager.clear(Scope.LOCAL)
使用Controller Services
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientimport org.apache.nifi.distributed.cache.client.Serializerimport org.apache.nifi.distributed.cache.client.Deserializerimport java.nio.charset.StandardCharsetsdef StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String>def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>// clientServiceId为动态属性,clientServiceId的值是service实际唯一标识def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)def result = myDistClient.get('a', StringSerializer, StringDeserializer)log.info("Result = $result")
引入第三方包
Groovy脚本引擎(至少用于ExecuteScript中)不支持其他Groovy脚本的导入,打包成jar再用
Module Directory设置,jar包或者目录,用逗号隔开;如果设置的是目录,会加载所有包;配合import肯定能用;
实战演练
功能是给数据做一个数据转换功能,把{"a": 1,"b":{"bytes":"aaaaa"}}json下的bytes加密后给原来字段b
{"a": 1,"b":{"bytes":"aaaaa"}}
目标数据
{"a":1,"b":"YWFhYWE="}

第一步GenerateFlowFile


第二步脚本ExecuteScript
import groovy.json.JsonSlurperimport groovy.json.JsonBuilderdef ff = session.get()if(!ff)returnff = session.write(ff, {rawIn, rawOut->rawIn.withReader("UTF-8"){reader->rawOut.withWriter("UTF-8"){writer->//parse reader into Mapdef json = new JsonSlurper().parse(reader)//tododef bytes = json.b.bytesjson.b =bytes.getBytes().encodeBase64().toString()//write changed object to writernew JsonBuilder(json).writeTo(writer)}}} as StreamCallback)session.transfer(ff, REL_SUCCESS)

后面查看执行结果




总结
nifi流程配合着groovy脚本处理数据,可以使得我们流程更加灵活




