暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ExecuteScrip-Groovy

NIFI实战 2020-07-13
988

属性
描述
Script Engine

可选脚本语言

groovy,clojure,ecmascript,lua,python,ruby

Script File脚本文件路径
Script Body脚本
Module Directory脚本用到的包;例如用到的fastjosn包,包fastjson的jar包放到这个路径就可以了

备注:Script File和Script Body不能全填,也不能全不填,Script Body不支持动态获取,Script File支持从变量中获取,均不支持从flowfile里边获取。


下面我们开始学习如果写一个ExecuteScrip能执行的groovy脚本

直接可用变量用处
session

create()、putAttribute()transfer()read()write().

context

用于检索处理器属性、关系、控制器服务和StateManager;

StateManager用法

getState,setState,replace,clear

log

log.info('Hello world!')

log.info('{} {} {}', ['Hello',1,true] as Object[])

REL_SUCCESS路由到success连接
REL_FAILURE路由到failure连接
Dynamic Properties静态的;动态的带EL表达式的;control service的唯一标识;


从上游队列获取flowfile

    //单个获取
    flowFile = session.get()
    if(!flowFile) return
      // 获取多个
      flowFileList = session.get(100)
      if(!flowFileList.isEmpty()) {
      flowFileList.each { flowFile ->
      // Process each FlowFile here
      }
      }

      从flowfile中读数据

        import org.apache.commons.io.IOUtils
        import java.nio.charset.StandardCharsets
        flowFile = session.get()
        if(!flowFile)return
        def text = ''
        // Cast a closure with an inputStream parameter to InputStreamCallback
        session.read(flowFile, {inputStream ->
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        // Do something with text here
        } as InputStreamCallback)

        创建新的flowfile

          flowFile = session.create()
          // Additional processing here
            flowFile = session.get()
            if(!flowFile) return
            newFlowFile = session.create(flowFile)
            // Additional processing here

            获取属性

              flowFile = session.get()
              if(!flowFile) return
              myAttr = flowFile.getAttribute('filename')
                // 遍历所有属性
                flowFile = session.get()
                if(!flowFile) return
                flowFile.getAttributes().each { key,value ->
                // Do something with the key/value pair
                }

                写入flowfile属性

                  flowFile = session.get()
                  if(!flowFile) return
                  flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')
                    attrMap = ['myAttr1': '1', 'myAttr2': Integer.toString(2)]
                    flowFile = session.get()
                    if(!flowFile) return
                    flowFile = session.putAllAttributes(flowFile, attrMap)


                    写入flowfile数据

                      import org.apache.commons.io.IOUtils
                      import java.nio.charset.StandardCharsets


                      flowFile = session.get()
                      if(!flowFile) return
                      def text = 'Hello world!'
                      // Cast a closure with an outputStream parameter to OutputStreamCallback
                      flowFile = session.write(flowFile, {outputStream ->
                      outputStream.write(text.getBytes(StandardCharsets.UTF_8))
                      } as OutputStreamCallback)

                      给下游分发flowfile

                        flowFile = session.get()
                        if(!flowFile) return
                        // Processing occurs here
                        if(errorOccurred) {
                        session.transfer(flowFile, REL_FAILURE)
                        }
                        else {
                        session.transfer(flowFile, REL_SUCCESS)
                        }

                        打日志

                          log.info('Hello world!')
                          log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

                          Reading and writing to/from a flow file

                            import org.apache.commons.io.IOUtils
                            import java.nio.charset.StandardCharsets


                            flowFile = session.get()
                            if(!flowFile) return
                            def text = 'Hello world!'
                            // Cast a closure with an inputStream and outputStream parameter to StreamCallback
                            flowFile = session.write(flowFile, {inputStream, outputStream ->
                            text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
                            outputStream.write(text.reverse().getBytes(StandardCharsets.UTF_8))
                            } as StreamCallback)
                            session.transfer(flowFile, REL_SUCCESS)

                            异常捕获

                              flowFile = session.get()
                              if(!flowFile) return
                              try {
                              // Something that might throw an exception here


                              // Last operation is transfer to success (failures handled in the catch block)
                              session.transfer(flowFile, REL_SUCCESS)
                              } catch(e) {
                              log.error('Something went wrong', e)
                              session.transfer(flowFile, REL_FAILURE)
                              }

                              使用动态添加的属性



                                // 获取动态添加的静态变量
                                def myValue1 = myProperty1.value
                                  // 获取动态添加的el表达式动态变量
                                  def myValue2 = myProperty2.evaluateAttributeExpressions(flowFile).value

                                  使用状态管理

                                    import org.apache.nifi.components.state.Scope
                                    // 读
                                    def oldMap = context.stateManager.getState(Scope.LOCAL).toMap()
                                      import org.apache.nifi.components.state.Scope
                                      def stateManager = context.stateManager
                                      def stateMap = stateManager.getState(Scope.CLUSTER)
                                      def newMap = ['myKey1': 'myValue1']
                                      if (stateMap.version == -1) {
                                      // 覆盖
                                      stateManager.setState(newMap, Scope.CLUSTER);
                                      } else {
                                      // merger
                                      stateManager.replace(stateMap, newMap, Scope.CLUSTER);
                                      }
                                        // 清理
                                        import org.apache.nifi.components.state.Scope
                                        context.stateManager.clear(Scope.LOCAL)

                                        使用Controller Services

                                          import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
                                          import org.apache.nifi.distributed.cache.client.Serializer
                                          import org.apache.nifi.distributed.cache.client.Deserializer
                                          import java.nio.charset.StandardCharsets


                                          def StringSerializer = {value, out -> out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String>
                                          def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>
                                          // clientServiceId为动态属性,clientServiceId的值是service实际唯一标识
                                          def myDistClient = clientServiceId.asControllerService(DistributedMapCacheClient)
                                          def result = myDistClient.get('a', StringSerializer, StringDeserializer)
                                          log.info("Result = $result")


                                          引入第三方包

                                          • Groovy脚本引擎(至少用于ExecuteScript中)不支持其他Groovy脚本的导入,打包成jar再用

                                          • Module Directory设置,jar包或者目录,用逗号隔开;如果设置的是目录,会加载所有包;配合import肯定能用;


                                          实战演练

                                          功能是给数据做一个数据转换功能,把{"a": 1,"b":{"bytes":"aaaaa"}}json下的bytes加密后给原来字段b

                                            {"a": 1,"b":{"bytes":"aaaaa"}}

                                            目标数据

                                              {"a":1,"b":"YWFhYWE="}

                                              第一步GenerateFlowFile


                                              第二步脚本ExecuteScript

                                                import groovy.json.JsonSlurper
                                                import groovy.json.JsonBuilder


                                                def ff = session.get()
                                                if(!ff)return
                                                ff = session.write(ff, {rawIn, rawOut->
                                                rawIn.withReader("UTF-8"){reader->
                                                rawOut.withWriter("UTF-8"){writer->
                                                //parse reader into Map
                                                def json = new JsonSlurper().parse(reader)
                                                //todo
                                                def bytes = json.b.bytes
                                                json.b =bytes.getBytes().encodeBase64().toString()
                                                //write changed object to writer
                                                new JsonBuilder(json).writeTo(writer)
                                                }
                                                }
                                                } as StreamCallback)
                                                session.transfer(ff, REL_SUCCESS)

                                                后面查看执行结果


                                                总结

                                                nifi流程配合着groovy脚本处理数据,可以使得我们流程更加灵活

                                                文章转载自NIFI实战,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                评论