
all-publish-processor

all-consume-processor
kafak是什么?
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
基本术语
| Broker | 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic; |
| Producer | 消息生产者,向kafka broker发消息的客户端; |
| Consumer | 消息消费者,向kafka broker取消息的客户端; |
| Topic | 可以理解为一个队列; |
| Consumer Group | 1是kafka把消息发给所有的consumer或者发给某一个consumer的手段 2一个topic可以有多个CG 3 topic的消息会复制一样的,无差别的把数据发送给所有的CG,但每个partion只会把消息发给该CG中的一个consumer 4如果需要实现广播,只要每个consumer有一个独立的CG 5 要实现单播只要所有的consumer在同一个CG 6用CG还可以将consumer进行自由的分组,而不需要多次发送消息到不同的topic |
| Partition | 为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序; |
| Offset | kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。 |
NIFI和kafka相关的处理器有哪些?

Producer:PublishKafka、PublishKafka_0_10、PublishKafka_0_11、PublishKafka_1_0、PublishKafka_2_0、PublishKafkaRecord_0_10、PublishKafkaRecord_0_11、PublishKafkaRecord_1_0、PublishKafkaRecord_2_0
Consumer:ConsumeKafka、ConsumeKafka_0_10、ConsumeKafka_0_11、ConsumeKafka_1_0、ConsumeKafka_2_0、ConsumeKafkaRecord_0_10、ConsumeKafkaRecord_0_11、ConsumeKafkaRecord_1_0、ConsumeKafkaRecord_2_0
为什么会有这么多版本的处理器?
每一套处理器都对应着kafka一个大版本的迭代。kafka目前总共演进了7个大版本,分别是0.7、0.8、0.9、0.10、0.11、1.0和2.0。
其中PublishKafka和ConsumeKafka就对应着0.9版本,其他处理器和版本的对应关系根据处理器的后缀就可以看出;
Producer相关处理器是因为Producer API的版本变更;
Consumer相关处理器是因为Consumer API的版本变更;
0.9版本:
2015年11月,社区正式发布了0.9.0.0版本,这是一个重量级的大版本更迭,0.9大版本增加了基础的安全认证/权限功能,同时使用java重写了新版本消费者的api,另外还引入了kafka connect组件用于实现高性能的数据抽取。这个版本另一个特点,那就是新版本的producer api在这个版本中算比较稳定。总体来说就是此时的kafka版本迭代较快,Producer和Consumer不管是0.9.0.0版本还是老版本都是bug多多。所以我们用nifi相关处理器也尽量用2.0的吧;
0.10版本:
0.10.0.0是里程碑式的大版本,因为该版本引入了kafka streams。从这个版本起,kafka正式升级成为分布式流处理平台,虽然此时的kafka streams还不能上线部署使用。0.10大版本包含两个包含两个小版本:0.10.1和0.10.2,它们的主要功能变更都是在kafka streams组件上。如果把kafka作为消息引擎,实际上该版本并没有太多的功能提升。0.10.2.2版本起,新版本consumer api算是比较稳定了。如果你依然在使用0.10大版本,强烈建议你至少升级到0.10.2.2然后再使用新版本的consumer api。还有个事情不得不提,0.10.2.2修复了一个可能导致producer性能降低的bug。基于性能的缘故你也应该升级到0.10.2.2。
0.11版本:
在2017年6月,社区发布了0.11.0.0版本,引入了两个重量级的功能变更:一个是提供幂等性producer api;另一个是对kafka消息格式做了重构。
1.0和2.0版本:
因为在我看来这两个大版本主要还是kafka streams的各种改进,在消息引擎方面并未引入太多的重大功能特性。kafka streams的确在这两个版本有着非常大的变化,也必须承认kafka streams目前依然还在积极地发展着。如果你是kafka streams的用户,只要选择2.0.0版本吧。
总结:NIFI对于老版本的kafka保留兼容态度,新版本的kafka只要发了并且Producer和Consumer的api,不管是有性能还是设计上的提升,还是为了修复bug还是兼容新的版本,nifi都会出一个新版本kafka兼容的处理器,保证了nifi对新版本kafka版本的支持;
两类处理器配置解读
Producer
| Kafka Brokers | kafka服务器,可以填写多个服务器,用都好隔开; 格式: <host>:<port>,<host>:<port> |
| Security Protocol | 安全协议,客户端和kafka服务器交互使用的通信协议; 由kafka的security.protocol配置决定; |
| SASL Mechanism | 用于身份验证的SASL机制 由kafka的sasl.mechanism配置决定; kfaka支持的认证方式 GSSAPI (Kerberos) PLAIN SCRAM-SHA-256 SCRAM-SHA-512 具体那天kafka开始做授权认证了,再详细研究研究 |
| Kerberos Service Name | 认证服务名称 除非选择<Security Protocol>的SASL选项之一,否则将忽略该属性。 |
| Kerberos Credentials Service | Kerberos登陆认证服务 |
| Kerberos Principal | The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property. |
| Kerberos Keytab | Kerberos密钥 |
| Username | The username when the SASL Mechanism is PLAIN or SCRAM-SHA-256 |
| Password | The password for the given username when the SASL Mechanism is PLAIN or SCRAM-SHA-256 |
| Token Auth | true/false 当SASL机制为SCRAM-SHA-256时,此属性指示是否应使用令牌认证。 |
| SSL Context Service | 指定用于与Kafka通信的SSL上下文服务 |
| Topic Name | 发布到的Kafka主题的名称 |
| Delivery Guarantee | 指定保证将消息发送到Kafka的要求。对应于Kafka的“ acks”属性。 acks:0 -Best Effort 把内容写入kafka节点,无需等待,立刻返回发送下一批数据。 性能最好,但是会丢数据 acks:1-Guarantee Single Node Delivery 如果单个kafka单个节点收到消息,无论kafka是否复制该消息都认为成功。因为还没有确保备份,如果该节点崩溃又没备份,有丢失数据可能。 性能较好,但是如果kafka节点崩溃,可能丢失数据 acks:-1-Guarantee Replicated Delivery 除非主题将消息复制到适当数量的kafka节点,才算完整成功 牺牲了性能,确保了不丢失 |
| Use Transactions | 如果将数据发送到Kafka时出现问题,并且此属性设置为false,则已经发送到Kafka的消息将继续发送并传递给使用者。 如果将其设置为true,则Kafka事务将回滚,以使这些消息对消费者不可用。 将此设置为true要求将<Delivery Guarantee>属性设置为“ Guarantee Replicated Delivery” |
| Transactional Id Prefix | Use Transactions为true时,KafkaProducer配置'transactional.id'将是生成的UUID,并以该字符串为前缀。 |
| Attributes to Send as Headers (Regex) | flowfile正则匹配,当匹配到了属性,就会把属性写入kafka的message的header |
| Message Header Encoding | kafka的message header的编码 |
| Kafka Key | key属性被声明的话,一个partition会通过key的hash而被选中。如果既没有key也没有partition属性数值被声明,那么一个partition将会被分配以轮询的方式。 |
| Key Attribute Encoding | FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded. 会给flowfile加一个属性kafka.key,这个属性声明属性值如何编码 |
| Message Demarcator | 如果未指定,则FlowFile的全部内容将用作一条消息。如果指定,则FlowFile的内容将会分割,并且每个部分作为单独的Kafka消息发送。(不能乱用) |
| Max Request Size | 请求的最大大小(以字节为单位)。对应于Kafka的'max.request.size'属性,默认值为1 MB(1048576)。 |
| Acknowledgment Wait Time | 在向Kafka发送消息后,这表明我们愿意等待Kafka做出回应的时间。如果Kafka在此时间段内未确认该消息,则FlowFile将被路由为“失败”。 |
| Max Metadata Wait Time | 在整个“发送”调用失败之前,发布者将在“发送”调用期间等待获取元数据或等待缓冲区刷新的时间。对应于Kafka的'max.block.ms'属性 |
| Partitioner class | 指定用于计算消息的分区标识的类。对应于Kafka的'partitioner.class'属性。 |
| Partition | 指定要去的分区记录 |
| Compression Type | This parameter allows you to specify the compression codec for all data generated by this producer. 数据压缩类型 |
动态属性
| The name of a Kafka configuration property. | 加载任何提供的配置属性后,这些属性将添加到Kafka配置上。 |
Consumer
| Kafka Brokers | kafka服务器,可以填写多个服务器,用都好隔开; 格式: <host>:<port>,<host>:<port> |
| Security Protocol | 安全协议,客户端和kafka服务器交互使用的通信协议; 由kafka的security.protocol配置决定; |
| Kerberos Service Name | 认证服务名称 除非选择<Security Protocol>的SASL选项之一,否则将忽略该属性。 |
| Kerberos Credentials Service | Kerberos登陆认证服务 |
| Kerberos Principal | The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property. |
| Kerberos Keytab | Kerberos密钥 |
| SSL Context Service | 指定用于与Kafka通信的SSL上下文服务 |
| Topic Name(s) | Kafka主题的名称。是多个用逗号分隔 |
| Topic Name Format | names/pattern 制定topic Names是名称还是正则 |
| Group ID | 消费组 |
| Offset Reset | none/latest/earliest latest:获取最新,默认 earliest:从头开始 |
| Key Attribute Encoding | UTF-8/Hex FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded. |
| Message Demarcator | 如果未指定,则FlowFile的全部内容将用作一条消息。如果指定,则FlowFile的内容将会分割,并且每个部分作为单独的Kafka消息发送。(不能乱用) |
| Max Poll Records | 指定Kafka在一次轮询中应返回的最大记录数。 |
| Max Uncommitted Time | 指定在必须提交偏移量之前允许通过的最长时间。 |
动态属性
| The name of a Kafka configuration property. | 加载任何提供的配置属性后,这些属性将添加到Kafka配置上。 |
备注:这里不再单独介绍使用Record相关处理器了。举一个例子:PublishKafka_0_10和PublishKafkaRecord_0_10区别是什么?
PublishKafkaRecord_0_10比PublishKafka_0_10多了一些数据的reader和writer使得PublishKafkaRecord_0_10可以处理丰富的数据结构;
模版分享
链接: https://pan.baidu.com/s/1y9IXJGabM06EG985AnLgOw 密码: bdnb
附录一:kafka producer连接数不够用

# The number of threads that the server uses for processing requests, which may include disk I/O numnum.io.threads=8




