官网介绍
Creates FlowFiles from files in a directory. NiFi will ignore files it doesn't have at least read permissions for.
个人解读
1.扫描指定路径,通过各种配置过滤扫描的文件,可以把文件内容写入flowfile的contennt
2.原生代码不适合扫描大批量文件的文件夹,容易oom
3.可以递归扫描目录,也可以选择保留文件或者读完就删除文件
4.通过Batch Size和Polling Interval,以及锁机制,支持高并发以及多线程操作
配置详情
| Input Directory | 扫描路径 |
| File Filter | 文件过滤器,只有被该正则命中的文件才会被读取,默认正则表达式 [^\.].*,表示扫描路径下的所有文件 |
| Path Filter | 当Recurse SubdirectoriesFile为true的时候,该配置指明子路径正则,子路径只有匹配该正则,才会被读取 |
| Batch Size | 缓冲队列的大小 |
| Keep Source File | ture/false true:扫描后删除文件 false:扫描后不删除文件 |
| Recurse Subdirectories | ture/false true:扫描子目录 false:不扫描子目录 |
| Polling Interval | 扫描间隔 |
| Ignore Hidden Files | ture/false true:忽略隐藏文件 false:不忽略 |
| Minimum File Age | 读取的文件,文件的最小产生时间 |
| Maximum File Age | 读取的文件,文件的最大产生时间 |
| Minimum File Size | 读取的文件,文件的最小大小 |
| Maximum File Size | 读取的文件,文件的最大大小 |
Relationships
| success | 所有扫描的文件都会路由到success |
Writes Attributes
| filename | 读取文件的名称 |
| path | 相对扫描路径的相对路径 例如:扫描出来的文件是/tmp/abc/1/2/3 扫描路径是/tmp 则该值将会是abc/1/2/3 |
| file.creationTime | 文件创建时间(不一定生效) |
| file.lastModifiedTime | 文件上一次被修改的时间(不一定生效) |
| file.lastAccessTime | 文件上一次被访问的时间(不一定生效) |
| file.owner | 文件owner |
| file.group | 文件group |
| file.permissions | 文件read/write/execute权限(不一定生效) |
| absolute.path | 被路由文件的绝对路径 |
Restricted
| read filesystem | 读权限 |
| write filesystem | 写权限 |
配置过滤相关源码
return new FileFilter() {@Overridepublic boolean accept(final File file) {对应配置Minimum File Sizeif (minSize > file.length()) {return false;}/ 对应配置Maximum File Sizeif (maxSize != null && maxSize < file.length()) {return false;}final long fileAge = System.currentTimeMillis() - file.lastModified();// 对应Minimum File Ageif (minAge > fileAge) {return false;}// 对应Maximum File Ageif (maxAge != null && maxAge < fileAge) {return false;}// 对应Ignore Hidden Filesif (ignoreHidden && file.isHidden()) {return false;}// 对应Path Filterif (pathPattern != null) {Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();if (reldir != null && !reldir.toString().isEmpty()) {if (!pathPattern.matcher(reldir.toString()).matches()) {return false;}}}// 对应需要读权限if (!Files.isReadable(file.toPath())) {return false;}// 如果读完删除需要写权限if (keepOriginal == false && !Files.isWritable(file.toPath().getParent())) {return false;}// 对应File Filterreturn filePattern.matcher(file.getName()).matches();}};
不适合大批量的原因

File[] children = directory.listFiles();
如何修改源码适合大批量
DirectoryStream<Path> stream = Files.newDirectoryStream(p)
备注:通过流式扫描文件,batchsize满了就退出
总结
一个高效扫描读取文件的处理器。两点要记住
1.不适合处理有大量文件的目录
2.会把文件内容读出,然后根据配置可能删除,如果选择不删除会无穷无尽重复读取数据。
文章转载自NIFI实战,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




