官网介绍
Converts records from one data format to another using configured Record Reader and Record Write Controller Services.
个人解读
一个用于转换Flwofile的content的处理器。准确来讲这个处理器就是一个壳子,通过动态配置reader单元和writer单元来实现其强大的数据格式转换功能。支持的reader和writer详细见附录
配置详情
| Record Reader | 配置继承于RecordReaderFactory的controller service用于读取Flwofile的content数据 GrokReader AvroReader见附录一SyslogReader ScriptedReader Syslog5424Reader XMLReader ParquetReader JsonTreeReader JsonPathReader CSVReader |
| Record Writer | 配置继承于RecordSetWriterFactory的controller service用于写出Flwofile的content数据 AvroRecordSetWriter见附录二 ScriptedRecordSetWriterCSVRecordSetWriter FreeFormTextRecordSetWriter JsonRecordSetWriter ParquetRecordSetWriter XMLRecordSetWriter |
| Include Zero Record FlowFiles |
指出没有通过reader单元如果拿到数据条数为空的处理策略。false空结果不路由,直接丢弃; true不管结果如果,任然路由到success |
路由关系
| success | FlowFiles that are successfully transformed will be routed to this relationship |
| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship。 解析异常,转换异常路由到failure |
写出属性
| mime.type | 由writer单元配置 |
| record.count | 读取数据条数 |

实战:一个json数据写成avro数据再写成json的例子
自定义schema
{"type":"record","name":"achievement_source","namespace":"any.data1","fields":[{"name":"id","type":["null","string"]},{"name":"idcard","type":["null","string"]}]}
json数据
{"id":"1","idcard":"253618648491312","name":"Crystal Lee","chinese":"85","math":"81","english":"107","multiple":"256","total":"529","creat_time":"2020-11-04 23:01:01.0","update_time":null}

AvroRecordSetWriter配置

AvroSchemaRegistry

AvroReader
模板自取
链接:https://pan.baidu.com/s/1VgS9zKqn99fgqCslK4IGaQ
提取码:pp11
附录一:AvroReader
数据格式支持:1 schema内置 2 正常的纯avro数据
schema来源:1.数据内置 2.Schema Name模式下从Schema Registry中通过Schema Name提取schema
3.Schema Tex模式下通过Schema Text配置提取schema
| Schema Access Strategy |
指明获取schema的策略,默认从数据内置获取 |
| Schema Registry | schema的注册集 |
| Schema Name | Schema Name' Property模式下从Schema Registry提取schema的配置名称 |
| Schema Version | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Branch不能都填写(reader模块无效) |
| Schema Branch | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Version不能都填写(reader模块无效) |
| Schema Text | Use 'Schema Text' Property 读取schema |
| Cache Size | 为了提高效率,schema不是每次都去编译的。这里指明缓存大小,缓存使用过的是LoadingCache,key是schema内容,value是编译后的schema对象 |
源码关于获取schema的一个对象设计

对应着Schema Name,Schema Version,Schema Branch
附录二:AvroRecordSetWriter
写avro数据的controller service
| Schema Write Strategy |
指定数据如何添加schema |
| Schema Cache | 指定要向其中添加记录模式的模式缓存,以便记录读取器可以快速查找该模式。 |
| Schema Access Strategy |
指明获取schema的策略,默认从数据内置获取 |
| Schema Registry | schema的注册集 |
| Schema Name | Schema Name' Property模式下从Schema Registry提取schema的配置名称 |
| Schema Version | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Branch不能都填写 |
| Schema Branch | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Version不能都填写 |
| Schema Text | Use 'Schema Text' Property 读取schema |
| Compression Format | 数据压缩方式 |
| Cache Size | 为了提高效率,schema不是每次都去编译的。这里指明缓存大小,缓存使用过的是LoadingCache,key是schema内容,value是编译后的schema对象 |
| Encoder Pool Size | avro-writer需要使用一个编码器。编码器的创建是昂贵的,但一旦创建,它们就可以被重用。此属性控制可合用和重用的编码器的最大数量。将该值设置得太小可能会导致性能下降,但将其设置得更高可能会使用更多堆。如果Avro写入器配置了“Embed Avro Schema”的模式写入策略,则该属性将被忽略。 |




