暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Notify和Wait

NIFI实战 2020-12-09
520

个人解读

这两个处理器使用,首先需要一个分布式缓存服务可以是java的,也可以是redis也可以是hbase的。通过该处理器的Flowfile会根据配置的缓存信号唯一标识,新建缓存信号对象,这个缓存信号对象可以存属性也可以指定名称。当Flowfile流过与notify对应的wait处理器,wait会是否计次,如果全部释放了,Flowfile才能流向wait的success,否则流向wati,超时流向expired。

Notify用到的属性:

Distributed Cache Service关联分布式服务;

Release Signal Identifier用于创建信号通知对象的唯一标识;

Signal Counter Name信号计数器名

Signal Counter Delta计次步长;

Signal Buffer Count处理器的拉取一次拉取的Flowfile条数;

Attribute Cache Regex上面说到的信号通知对象可以缓存属性,这里是用于缓存的属性正则;

wait用到的属性:

Release Signal Identifier用于从缓存获取信号通知对象的唯一标识;

Target Signal Count确定目标信号计数。这个处理器检查信号计数是否已经达到这个数字。如果指定了信号计数器名称,该处理器将检查一个特定的计数器,否则将检查信号中的总计数。

Signal Counter Name确定信号计数器名。如果未指定,该处理器检查信号中的总计数。

Wait Buffer Count指定可以缓冲的传入流文件的最大数量,以检查它是否可以向前移动。越多的缓冲区可以提供更好的性能,因为它通过根据信号标识符对流文件进行分组来减少与缓存服务的交互次数。在处理器执行时,只有信号标识符可以被处理。

Releasable FlowFile Count确定可发布的FlowFile条数。它指定当目标计数达到目标信号计数时可以释放多少个流文件。0(0)有特殊的含义,只要信号计数与目标匹配,可以释放任意数量的流文件。

Expiration Duration等待的流文件将被路由到“expired”关系的持续时间。

Distributed Cache Service分布式缓存服务。

Attribute Copy Mode指定复制属性策略Replace if present/Keep original,覆盖/不覆盖

Wait Mode等待模式Transfer to wait relationship/Keep in the upstream connection,路由到等待/留在上游的队列

Wait Penalty Duration如果配置后,信号标识符经过处理但不符合发布标准,则对该信号标识符进行处罚,具有该信号标识符的流文件在指定的一段时间内不进行再次处理,以使该信号标识符不会阻碍其他文件的处理。这对于等待处理器需要处理多个信号标识符,而每个信号标识符有多个流文件的情况非常有用,而且在信号标识符中释放流文件的顺序非常重要。可以用优先级来配置流文件顺序。重要提示:可以处理的排队信号数量有限制,等待处理器可能不能检查所有排队信号的id。


notify逻辑流程图

总结

notify和wait一般是搭配出现,notify生成信号统计记录,wait释放信号,两者通过分布式缓存服务传递数据。notify会根据唯一标示,给flowfile生成信号类,信号类记录flowfile的唯一标识,唯一标识名称,以及一些属性,以及同样标识的期望flowfile个数。wait会根据flowfile的唯一标识取出信号对象,并且统计释放信号,如果没有达到释放条件,会根据配置把flowfile路由到wait或者保存在上游的队列中,并且做一些调度惩罚。当达到释放条件,相关标识flowfile会路由到success。

感觉应用场景不是很广泛,也不敢用,设计虽然比较精巧,但是谁能保证数据100%不出异常,到达wait呢。是否会形成死的flowfile,虽然有过期时间,但是如果大量阻塞,过期时间内数据可能对wait出现频繁调度,cpu妥妥的上升,个人不会去使用这两个处理器,使用这两个处理器的小伙伴一定要分析好业务流程,取舍得当。


文章转载自NIFI实战,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论