Nifi入门
能够安装NiFi 了解NiFi的处理器 了解NiFi的其他组件 能够使用NiFi进行简单场景的练习 了解NiFi处理器的大致类别 了解NiFi处理器的核心属性 了解NiFi模版 了解NiFi监控 了解数据来源
常用术语
运行环境准备
下载

修改默认端口和IP
sed -i "s/nifi.web.http.port=8080/nifi.web.http.port=8088/g" opt/nifi/conf/nifi.propertiessed -i "s/nifi.web.http.host=/nifi.web.http.host=192.168.42.133/g" /opt/nifi/conf/nifi.properties
启动
Windows用户
对于Windows用户,进入安装NiFi的文件夹,在此文件夹中有一个名为bin的子文件夹。进入此子文件夹,然后双击run-nifi.bat文件。
这将启动NiFi并让它在前台运行。要关闭NiFi,请选择已启动的窗口,按住Ctrl+C。
Linux、Mac用户
对于Linux和OS X用户,使用终端窗口进入安装NiFi的目录。要在前台运行NiFi,就运行bin/nifi.sh run,直到用户按下Ctrl-C,NIFI就关闭了。
要在后台运行NiFi,请运行bin/nifi.sh start。这将启动应用程序并开始运行。要检查状态并查看NiFi当前是否正在运行,请执行bin/nifi.sh status命令。可以通过执行命令bin/nifi.sh stop关闭NiFi 。
作为一个服务进行安装
目前,仅支持Linux和Mac OS X用户作为服务进行NiFi的安装。要将应用程序作为服务去安装,首先进入安装目录,然后执行命令bin/nifi.sh install,这样就以默认名称nifi安装服务了。要为服务指定自定义名称,请使用可选的第二个参数(该服务的名称)执行该命令。例如,要将NiFi作为具有dataflow名称的服务安装,请使用该命令:bin/nifi.sh install dataflow。
安装后,可以使用适当的命令启动和停止服务,例如sudo service nifi start 和sudo service nifi stop。此外,可以通过执行sudo service nifi status命令检查运行状态。
以linux版本为例, 解压并启动
## 下载目录为/opt
# 进入目录
cd /opt
# 解压文件
tar zxvf nifi-1.12.1-bin.tar.gzmv nifi-1.12.1-bin nifi # 得到目录: nifi-1.12.1, 进入bin目录并查看目录内容
cd nifi/bin && ls
# 结果: dump-nifi.bat nifi-env.bat nifi-env.sh nifi.sh run-nifi.bat status-nifi.bat
# 使用 nifi.sh 进行单机运行操作, 常用参数如下:
# ./nifi.sh --help
# Usage nifi {start|stop|run|restart|status|dump|install}
## 以下是常用命令
# 启动:
./nifi.sh start
# 关闭:
./nifi.sh stop
# 重启:
./nifi.sh restart
# 状态:
./nifi.sh status
``
执行启动命令后, 需要等待 1 - 5分钟 ( 根据电脑配置 ), 可以查看日志, 看到端口( 这里配置的是58080 )说明启动成功, 查看日志操作如下:
# 日志目录:nifi-1.12.1/logs
cd logs && tail -f nifi-app.log
# 看到如下日志说明启动成功( ip根据你电脑的ip而定, 可能不一样 )

启动成功后访问 : ip:8080/nifi 进行查看, 访问界面如下:

Nifi处理器
查看处理器


常用处理器
ExecuteScript : 执行脚本处理器, 支持: clojure, ecmascript, groovy, lua, python, ruby
QueryDatabaseTable : 数据库查询处理器, 支持: mysql
ConvertAvroToJSON : avro 数据格式转换为 json
SplitJson : 将JSON文件拆分为多个单独的FlowFiles, 用于由JsonPath表达式指定的数组元素。
EvaluateJsonPath : 根据FlowFile的内容评估一个或多个JsonPath表达式。这些表达式的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。
ReplaceText : 文本组装与替换, 支持正则表达式
PutHDFS : 将FlowFile数据写入Hadoop分布式文件系统(HDFS)
PutHiveQL : 执行hive ddl/dml命令, 如: insert, update
PublishKafka_2_0 : 根据配置将消息发送到kafka topic
SelectHiveQL : 执行hive select 语句并获取结果
PutSQL : 执行SQL的insert或update命令
GetFile : 从目录中的文件创建FlowFiles。
PutFile : 将FlowFile数据写入文件
GetHDFS : 从Hadoop分布式文件系统获取文件
CaptureChangeMySQL : 从MySQL数据库中检索更改数据捕获(CDC)事件。CDC事件包括INSERT,UPDATE,DELETE操作。事件作为单个流文件输出,这些文件按操作发生的时间排序。
ExecuteStreamCommand : 一般用于执行sh脚本
配置处理器
添加一个处理器

配置处理器配置项说明

Configure(配置):此选项允许用户建立或更改处理器的配置。 Start(启动或停止):此选项允许用户启动或停止处理器; 该选项可以是Start或Stop,具体取决于处理器的当前状态。 Disable(启用或禁用):此选项允许用户启用或启用处理器; 该选项将为“启用”或“禁用”,具体取决于处理器的当前状态。 View data provenance(查看数据来源):此选项显示NiFi数据来源表,其中包含有关通过该处理器路由FlowFiles的数据来源事件的信息。 View status history(查看状态历史记录):此选项打开处理器统计信息随时间的图形表示。 View usage(查看用法):此选项将用户带到处理器的使用文档。 View connection → Upstream(查看连接→上游):此选项允许用户查看和“跳转”入处理器的上游连接。当处理器连接进出其他进程组时,这尤其有用。 View connection → Downstream(查看连接→下游):此选项允许用户查看和“跳转”到处理器外的下游连接。当处理器连接进出其他进程组时,这尤其有用。 Centere in view(视图中心):此选项将画布的视图置于给定的处理器上。 Change color(更改颜色):此选项允许用户更改处理器的颜色,这可以使大流量的可视化管理更容易。 Group (添加到组):把当前处理器添加到组 Create template(创建模板):此选项允许用户从所选处理器创建模板。 Copy(复制):此选项将所选处理器的副本放在剪贴板上,以便可以通过右键单击工作区并选择“粘贴”将其粘贴到工作区上的其他位置。复制粘贴操作也可以使用按键Ctrl-C和Ctrl-V完成。 Delete(删除):此选项允许从画布中删除处理器。
配置处理器


SETTING ( 设置 )

SCHEDULING ( 任务调度 )

PROPERTIES ( 属性 )

COMMENTS ( 注释 )


其他组件
数据流传入点(input-port)

数据流输出点(output-port)

组(process-group)

远程组(remote process-group)

聚合(funnel)

模版(template)

便签(label)

导航(Navigate)

操作区(Operate)

配置(Configuration)
根据在当前工作区选中的组件, 进行属性配置, 可配置所有组件或组启用(enable)
启用组件, 不能操作组禁用(disable)
禁用组件, 不能操作组开始(start)
启动选择的组件或组, 不选择启动所有停止(stop)
停止选择的组件或组, 不选择停止所有创建模版(create template)
根据选择的组件或组创建模版
上传已保存的模版应用场景:
1. 添加和配置第一个处理器:GetFile
1.1 添加处理器

1.2 设置处理器名称

1.3 设置Properties

| Name | Default Value | Allowable Values | Description |

mkdir -p export/tmp/source
2. 第二处理器PutFile和启动流程
2.1 添加处理器

2.2 设置处理器属性

| Name | Default Value | Allowable Values | Description |
mkdir -p export/tmp/target
2.3 连接两个处理器


2.4 新增输入文件
cd export/tmp/source
echo "hello world" > hello-world.txt


echo "hello world" > hello-world.txt
echo "hello world" > hello-world2.txt


2.5 启动putfile


2.6 覆盖写入

echo "hello world again" > hello-world.txt

2.7 关闭处理器

处理器的类别
为了创建有效的数据流处理流程,用户必须了解可用的处理器类型。NiFi包含许多不同的处理器。这些处理器提供了可从众多不同系统中提取数据,路由,转换,处理,拆分和聚合数据以及将数据分发到多个系统的功能。
几乎每个NiFi版本中可用的处理器数量都在增加。因此,我们不会尝试在这里介绍每一个可用的处理器,但我们将重点介绍一些最常用的处理器,按功能对它们进行分类。
数据转换
CompressContent:压缩或解压 ConvertCharacterSet:将用于编码内容的字符集从一个字符集转换为另一个字符集 EncryptContent:加密或解密 ReplaceText:使用正则表达式修改文本内容 TransformXml:应用XSLT转换XML内容 JoltTransformJSON:应用JOLT规范来转换JSON内容
路由和调解
ControlRate:限制流程中数据流经某部分的速率 DetectDuplicate:根据一些用户定义的标准去监视发现重复的FlowFiles。通常与HashContent一起使用 DistributeLoad:通过只将一部分数据分发给每个用户定义的关系来实现负载平衡或数据抽样 MonitorActivity:当用户定义的时间段过去而没有任何数据流经此节点时发送通知。(可选)在数据流恢复时发送通知。 RouteOnAttribute:根据FlowFile包含的属性路由FlowFile。 ScanAttribute:扫描FlowFile上用户定义的属性集,检查是否有任何属性与用户定义的字典匹配。 RouteOnContent:根据FlowFile的内容是否与用户自定义的正则表达式匹配。如果匹配,则FlowFile将路由到已配置的关系。 ScanContent:在流文件的内容中搜索用户定义字典中存在的术语,并根据这些术语的存在或不存在来路由。字典可以由文本条目或二进制条目组成。 ValidateXml:以XML模式验证XML内容; 根据用户定义的XML Schema,判断FlowFile的内容是否有效,进而来路由FlowFile。1
数据库访问
ConvertJSONToSQL:将JSON文档转换为SQL INSERT或UPDATE命令,然后可以将其传递给PutSQL Processor ExecuteSQL:执行用户定义的SQL SELECT命令,结果为Avro格式的FlowFile PutSQL:通过执行FlowFile内容定义的SQL DDM语句来更新数据库 SelectHiveQL:对Apache Hive数据库执行用户定义的HiveQL SELECT命令,结果为Avro或CSV格式的FlowFile PutHiveQL:通过执行FlowFile内容定义的HiveQL DDM语句来更新Hive数据库
属性提取
EvaluateJsonPath:用户提供JSONPath表达式(类似于XPath,用于XML解析/提取),然后根据JSON内容评估这些表达式,用结果值替换FlowFile内容或将结果值提取到用户自己命名的Attribute中。 EvaluateXPath:用户提供XPath表达式,然后根据XML内容评估这些表达式,用结果值替换FlowFile内容或将结果值提取到用户自己命名的Attribute中。 EvaluateXQuery:用户提供XQuery查询,然后根据XML内容评估此查询,用结果值替换FlowFile内容或将结果值提取到用户自己命名的Attribute中。 ExtractText:用户提供一个或多个正则表达式,然后根据FlowFile的文本内容对其进行评估,然后将结果值提取到用户自己命名的Attribute中。 HashAttribute:对用户定义的现有属性列表的串联进行hash。 HashContent:对FlowFile的内容进行hash,并将得到的hash值添加到Attribute中。 IdentifyMimeType:评估FlowFile的内容,以确定FlowFile封装的文件类型。此处理器能够检测许多不同的MIME类型,例如图像,文字处理器文档,文本和压缩格式,仅举几例。 UpdateAttribute:向FlowFile添加或更新任意数量的用户定义的属性。这对于添加静态的属性值以及使用表达式语言动态计算出来的属性值非常有用。该处理器还提供"高级用户界面(Advanced User Interface)",允许用户根据用户提供的规则有条件地去更新属性。
系统交互
ExecuteProcess:运行用户自定义的操作系统命令。进程的StdOut被重定向,以便StdOut的内容输出为FlowFile的内容。此处理器是源处理器(不接受数据流输入,没有上游组件) - 其输出预计会生成新的FlowFile,并且系统调用不会接收任何输入。如果要为进程提供输入,请使用ExecuteStreamCommand Processor。 ExecuteStreamCommand:运行用户定义的操作系统命令。FlowFile的内容可选地流式传输到进程的StdIn。StdOut的内容输出为FlowFile的内容。此处理器不能用作源处理器 - 必须传入FlowFiles才能执行。
数据提取
GetFile:将文件内容从本地磁盘(或网络连接的磁盘)流式传输到NiFi,然后删除原始文件。此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。 GetFTP:通过FTP将远程文件的内容下载到NiFi中,然后删除原始文件。此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。 GetSFTP:通过SFTP将远程文件的内容下载到NiFi中,然后删除原始文件。此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。 GetJMSQueue:从JMS队列下载消息,并根据JMS消息的内容创建FlowFile。可选地,JMS属性也可以作为属性复制。 GetJMSTopic:从JMS主题下载消息,并根据JMS消息的内容创建FlowFile。可选地,JMS属性也可以作为属性复制。此处理器支持持久订阅和非持久订阅。 GetHTTP:将基于HTTP或HTTPS的远程URL的请求内容下载到NiFi中。处理器将记住ETag和Last-Modified Date,以确保不会持续摄取数据。 ListenHTTP:启动HTTP(或HTTPS)服务器并侦听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200响应。 ListenUDP:侦听传入的UDP数据包,并为每个数据包或每个数据包创建一个FlowFile(取决于配置),并将FlowFile发送到"success"。 GetHDFS:监视HDFS中用户指定的目录。每当新文件进入HDFS时,它将被复制到NiFi并从HDFS中删除。此处理器应将文件从一个位置移动到另一个位置,而不是用于复制数据。如果在集群中运行,此处理器需仅在主节点上运行。要从HDFS复制数据并使其保持原状,或者从群集中的多个节点流式传输数据,请参阅ListHDFS处理器。 ListHDFS / FetchHDFS:ListHDFS监视HDFS中用户指定的目录,并发出一个FlowFile,其中包含它遇到的每个文件的文件名。然后,它通过分布式缓存在整个NiFi集群中保持此状态。然后可以在集群中,将其发送到FetchHDFS处理器,后者获取这些文件的实际内容并发出包含从HDFS获取的内容的FlowFiles。 GetKafka:从Apache Kafka获取消息,特别是0.8.x版本。消息可以作为每个消息的FlowFile发出,也可以使用用户指定的分隔符一起进行批处理。 GetMongo:对MongoDB执行用户指定的查询,并将内容写入新的FlowFile。
数据出口/发送数据
PutEmail:向配置的收件人发送电子邮件。FlowFile的内容可选择作为附件发送。 PutFile:将FlowFile的内容写入本地(或网络连接)文件系统上的目录。 PutFTP:将FlowFile的内容复制到远程FTP服务器。 PutSFTP:将FlowFile的内容复制到远程SFTP服务器。 PutJMS:将FlowFile的内容作为JMS消息发送到JMS代理,可选择将Attributes添加JMS属性。 PutSQL:将FlowFile的内容作为SQL DDL语句(INSERT,UPDATE或DELETE)执行。FlowFile的内容必须是有效的SQL语句。属性可以用作参数,FlowFile的内容可以是参数化的SQL语句,以避免SQL注入攻击。 PutKafka:将FlowFile的内容作为消息发送到Apache Kafka,特别是0.8.x版本。FlowFile可以作为单个消息或分隔符发送,例如可以指定换行符,以便为单个FlowFile发送许多消息。 PutMongo:将FlowFile的内容作为INSERT或UPDATE发送到Mongo。
分裂和聚合
SplitText:SplitText接收单个FlowFile,其内容为文本,并根据配置的行数将其拆分为1个或多个FlowFiles。例如,可以将处理器配置为将FlowFile拆分为多个FlowFile,每个FlowFile只有一行。 SplitJson:允许用户将包含数组或许多子对象的JSON对象拆分为每个JSON元素的FlowFile。 SplitXml:允许用户将XML消息拆分为多个FlowFiles,每个FlowFiles包含原始段。这通常在多个XML元素与"wrapper"元素连接在一起时使用。然后,此处理器允许将这些元素拆分为单独的XML元素。 UnpackContent:解压缩不同类型的存档格式,例如ZIP和TAR。然后,归档中的每个文件都作为单个FlowFile传输。 SegmentContent:根据某些已配置的数据大小将FlowFile划分为可能的许多较小的FlowFile。不对任何类型的分界符执行拆分,而是仅基于字节偏移执行拆分。这是在传输FlowFiles之前使用的,以便通过并行发送许多不同的部分来提供更低的延迟。而另一方面,MergeContent处理器可以使用碎片整理模式重新组装这些FlowFiles。 MergeContent:此处理器负责将许多FlowFiles合并到一个FlowFile中。可以通过将其内容与可选的页眉,页脚和分界符连接在一起,或者通过指定存档格式(如ZIP或TAR)来合并FlowFiles。FlowFiles可以根据公共属性进行分箱(binned),或者如果这些流是被其他组件拆分的,则可以进行"碎片整理(defragmented)"。根据元素的数量或FlowFiles内容的总大小(每个bin的最小和最大大小是用户指定的)并且还可以配置可选的Timeout属性,即FlowFiles等待其bin变为配置的上限值最大时间。 SplitContent:将单个FlowFile拆分为可能的许多FlowFile,类似于SegmentContent。但是,对于SplitContent,不会在任意字节边界上执行拆分,而是指定要拆分内容的字节序列。
HTTP
GetHTTP:将基于HTTP或HTTPS的远程URL的内容下载到NiFi中。处理器将记住ETag和Last-Modified Date,以确保不会持续摄取数据。 ListenHTTP:启动HTTP(或HTTPS)服务器并侦听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200响应。 InvokeHTTP:执行用户配置的HTTP请求。此处理器比GetHTTP和PostHTTP更通用,但需要更多配置。此处理器不能用作源处理器,并且需要具有传入的FlowFiles才能被触发以执行其任务。 PostHTTP:执行HTTP POST请求,将FlowFile的内容作为消息正文发送。这通常与ListenHTTP结合使用,以便在无法使用s2s的情况下在两个不同的NiFi实例之间传输数据(例如,节点无法直接访问并且能够通过HTTP进行通信时代理)。注意:除了现有的RAW套接字传输之外,HTTP还可用作s2s传输协议。它还支持HTTP代理。建议使用HTTP s2s,因为它更具可扩展性,并且可以使用具有更好用户身份验证和授权的输入/输出端口的方式来提供双向数据传输。 HandleHttpRequest / HandleHttpResponse:HandleHttpRequest Processor是一个源处理器,与ListenHTTP类似,启动嵌入式HTTP(S)服务器。但是,它不会向客户端发送响应(比如200响应)。相反,流文件是以HTTP请求的主体作为其内容发送的,所有典型servlet参数、头文件等的属性作为属性。然后,HandleHttpResponse能够在FlowFile完成处理后将响应发送回客户端。这些处理器总是彼此结合使用,并允许用户在NiFi中可视化地创建Web服务。这对于将前端添加到非基于Web的协议或围绕已经由NiFi执行的某些功能(例如数据格式转换)添加简单的Web服务特别有用。
使用属性
首先,它允许用户在流中做出路由决策,以便满足某些条件的FlowFiles可以与其他FlowFiles进行不同地处理。这可以由RouteOnAttribute和其他类似的处理器完成的。 其次,利用属性配置处理器:处理器的配置依赖于数据本身。例如,PutFile能够使用Attributes来知道每个FlowFile的存储位置,而每个FlowFile的目录和文件名属性可能不同(结合表达式语言,比如每个流都有filename属性,组件中就可以这样配置文件名:${filename},就可以获取到当前FlowFIle中filename的属性值)。 最后,属性提供了有关数据的极有价值的上下文。在查看FlowFile的Provenance数据时非常有用,它允许用户搜索符合特定条件的Provenance数据,并且还允许用户在检查Provenance事件的详细信息时查看此上下文。通过简单地浏览该上下文,用户能够知道为什么以这样或那样的方式处理数据。
共同属性
filename:可用于将数据存储到本地或远程文件系统的文件名。 path:可用于将数据存储到本地或远程文件系统的目录的名称。 uuid:一个通用唯一标识符,用于区分FlowFile与系统中的其他FlowFiles。 entryDate:FlowFile进入系统的日期和时间(即已创建)。此属性的值是一个数字,表示自1970年1月1日午夜(UTC)以来的毫秒数。 lineageStartDate:任何时候克隆,合并或拆分FlowFile,都会导致创建子FlowFile。该值表示当前FlowFile最早的祖先进入系统的日期和时间。该值是一个数字,表示自1970年1月1日午夜(UTC)以来的毫秒数。 fileSize:此属性表示FlowFile内容占用的字节数。
提取属性
添加用户自定义的属性
属性路由
表达式语言/在Property值中使用attribute
表达式语言中的自定义属性
使用模板
选择要包含在模板中的组件。我们可以通过单击第一个组件,然后按住Shift键同时选择其他组件(以包括这些组件之间的连接),或者在画布上拖动所需组件周围的框时按住Shift键选择多个组件。 从操作面板中选择
图标。提供模板的名称和描述。 单击Create按钮。
从组件工具栏拖动到我们的画布上。然后,我们可以选择要添加到画布的模板,然后单击Add按钮。任何标记为敏感属性的属性(例如在处理器中配置的密码)都不会添加到模板中。每次将模板添加到画布时,都必须填充这些敏感属性。 如果模板中包含的组件引用Controller Service,则Controller Service也将添加到模板中。这意味着每次将模板添加到图表时,它都会创建Controller Service的副本。
监控NiFi
状态栏
组件统计
公告
数据来源

事件详情



谱系图

常见问题
Q1 是不是组件每种连接关系(suceess和failure等)都要有所对应?

Q2 组件已经正常运行了,右上角怎么还在报错?
Q3 为啥用Select组件查询出来的数据都是重复的?
每隔一段时间执行一次 特定时间执行一次 上游有数据就立刻执行




