注:尽力阐述了文中的原意,但因能力有限,难免有纰漏的地方。 --刘彬
在之前的博客中,我们为Kafka引⼊入了Exactly Once 语义,那篇文章涵盖了所有的消息传递语义,介绍了Kafka Stream 幂等的Producer,事务和Exactly once语义,现在我们接着上篇内容深入讲解
Kafka的事务处理,本⽂的目标是使读者熟悉Kafka 的事务API。
接下来,我们将讨论项目中主要使用的事务API、Kafka的事务语义、Java客户端事务API的详细信息、实现方式,最后,说一下在使用API时的注意事项。
本⽂文并不会涉及到事务的具体实现细节,我们将链接到JavaDocs设计文档,⽅便想进一步深入学习的读者。阅读本文希望您已经了解了Kafka的基本概念,比如topics,partitions,log offsets以及brokers和clients在Kafka应⽤用程序中的角色,并且熟悉Java的kafka客户端也会有所帮助。
为什么要有事务?
我们设计Kafka中的事务主要⽤用于“read-process-write”模式,其中读和写都来自于异步数据流操作,比如Kafka topics,这样的程序通常称之为流处理程序。
第一代的流处理程序可以允许处理的不准确 比如,应用程序不断的consumed web界面的impressions并且生成每个页面的错误数量并进行聚合操作。然而,随着这些应⽤程序的流⾏,对流处理程序的要求越来越严格,比如,一些金融机构⽤用流处理程序来处理⽤户账号的借贷情况,在这种情况下,处理是不允许有错误的,我们需要保证每一个消息的处理都不会出现意外。
更正规的讲,如果一个流处理程序输⼊了消息A 并且输出了B,那么B = F(A),只有当A被成功消费了,才认为B生产成功了,反之也是如此。使⽤用kafka 生产者和消费者传递at-least-once语义,流处理程序可能丢失 exactly once 处理的语义⽅方法:
1.producers.send()由于内部重试,可能导致消息B重复写入,这个是生产者幂等性决定的,并⾮非本⽂的重点。
2.我们可以重新处理消息A,使的消息B进⾏输出,违反了exactly once 语义,如果流处理程序在写完B之后崩溃了,但是A被消费之前可能会重新处理,因此,当它恢复时,将会再次消费A并且再次写入B,造成消息重复。
3.最后,在分布式环境中,应⽤用程序崩溃或者更糟糕的情况,导致暂时失去与其他系统之间的连接,通常,新的实例会自动启动替换哪些被认为丢失的实例,通过这个过程,可能导致我们有多个实例处理相同的topic,并输出到相同的topic中,造成重复输出,并完全违反exacly once语义,造成“僵尸实例”。
我们设计Kafka的事务APIs⽤用来解决第二和第三个问题,事务启⽤用exactly-once处理read-process-write使这些原子化
事务性语义
原子多分区写 :
事务使原子写入到多个kafka的主题和分区,事务中包含的所有消息要么全部写入要么就全部没有写入,比如,处理过程中发⽣生ERROR错误可能会使的事务中止,这种情况下事务中的消息都不会被 消费到,现在我们来看一下如果使原子实现read-process-write过程。
首先,我们思考一下read-process-write的意义,简而言之,如果一个应用程序消费了某个主题分区tp0 offset X的消息,在对消息A处理之后,将消息B写⼊入到topic-partition tp1中,这样B=F(A), 然后只有当消息A和消息B被认为成功的消费和发布,或者都没有成功,read-process-write才是原子性的 现在,只有当它的偏移量X被标记为消费时,才会被认为是在topic-partition tp0中使用的消息,当偏移量被标记之后我们称之为提交偏移量,在Kafka中,我们通过写入一个offset 值 ,kafka topic来 记录offset的提交,只有当offset被提交之后,该消息才能被使用.
因此,由于偏移值只是写入另一个Kafka Topic中,因此只有当消息被提交之后,才被认为是可以消费的,原子在多个topic和partition间的写操作也支持原子性的读写,当偏移量X提交到offset topic 中,并将消息B写入tp1,这个是单个事务的一部分,因此是原子性的
Zombie fencing
我们通过为每个事务⽣生成器分配一个称为transaction.id的唯一标识来解决“僵尸实例”,这⽤用于标识同一个producer实例跨进程重启,API要求第一次操作producer事务的时候应该在kafka集群中显式 的注册transactional.id ,kafka broker检查打开的事务与给定的transactional.id ,它增加了一个与transactional.id 相关的epoch,epoch是存储在每个transactional.id中的元数据的内部块 一旦epoch bumped,所有的具有相同transactional.id的producer和旧的epoch 就会被认为是“僵尸”,然后被隔离掉,将来事务写⼊入这些producer的时候会被拒绝。
读取事务消息
现在,我们将注意⼒力转到 当读取消息作为事务的一部分写⼊入消息时所提供的保证,当事务读取消息作为 如果事务确实提交了,kafka 消费者只会向应⽤用程序发送事务性消息,换句话说,消费者不会传递事务消息这是打开事务的一部分,或者它也不会传递消息作为中止事务的一部分 值得注意的是上述保证不符合事务阅读的要求,当使用一个kafka 消费者从一个topic中消费消息时,应⽤用程序不知道这些消息是否作为事务写入,所有他们不知道事务的开始和结束,进一步说,给 定一个消费者不保证能消费到所有分区,它没有办法发现这一点,很难保证单个事务中包含的所有消息最终都将由单个使用者使用。 简而言之,kafka保证消费者最终只会传递非事务性消息或者提交的事务性消息,它将保留打开的事务并过滤掉中止事务的消息。
Java中的事务API:
事务特性主要是一个服务器端和协议级别的协议,可以由任何⽀支持它客户端的库使用,在Java中使用kafka事务编写读写流程如下:
KafkaProducer producer = createKafkaProducer(
“bootstrap.servers”, “localhost:9092”,
“transactional.id”, “my-transactional-id”);
producer.initTransactions();
KafkaConsumer consumer = createKafkaConsumer(
“bootstrap.servers”, “localhost:9092”,
“group.id”, “my-group-id”,
"isolation.level", "read_committed");
consumer.subscribe(singleton(“inputTopic”));
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
for (ConsumerRecord record : records)
producer.send(producerRecord(“outputTopic”, record));
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
producer.commitTransaction();
}
KafkaTransactionsExample.java hosted with by GitHub
1-5行通过指定事务配置并将其注册到initTransactions API并且设置生产者的transactional.id,之后producer.initTransactions()返回,任何一个具有事务的生产者的实例启动的transactional.id 都将被 关闭并被隔离 .
7-10行指定kafka consumer只应读取非事务性消息或者从输⼊入的topics中提交事务性消息,流处理程序通常在多个read-process-write阶段,每个阶段都使⽤用前一阶段的输出作为输⼊入,通过指定 read_committed模式,我们能在所有阶段中精确的处理
14-21行演示了 read-process-write循环的核心:我们消费一些records,启动一个事务,来处理消费记录,将处理过的记录写⼊入输出topic中,将消费的offset发送到offsets topic,最后提交事务,通 过上述保障,我们知道offsets和输出记录都将作为一个原子单元来提交。
事务是如何工作的
在这部分,我们简要阐述了上⾯介绍的事务API引入的新组件和新数据流,为了更详细的论述这个问题,你可以阅读原始的设计文档或者观看kafka峰会的讨论 接下来的目标是在调试使⽤用事务的应用程序时提供一个mental model,或者尝试优化事务获得更好的性能

事务协调器和事务日志
在Kafka 0.11.0中引⼊入的事务API组件时事务协调器和上图右侧的事务日志
事务协调器是在每个kafka broker中运行的模块,事务日志是kafka内部的一个topic,每个协调器在事务日志中都拥有一些⼦子集,它的broker为partition leader,每一个transactional.id通过一个简单 的散列函数映射到事务⽇日志的特定分区,这意味着只有一个协调器拥有给定的transactional.id,我们利⽤用kafka的rock solid副本协议和leader选举过程,以确保事务协调器始终可用,并且所有事务 状态都是持久存储的
值得注意的是,事务日志只是存储事务的最新状态,而不是事务中的最新消息,消息指存储在实际的拓扑分区中,事务可以有多种状态比如:“正在进行”,“准备提交”和“完成”,这个状态和相关元数 据存储在事务日志中
数据流
在高级别上,数据流可以分为四种不同的类型
A:者和事务协调器交互
在执行事务的时候,生产者向事务协调器请求以下几点:
1.initTransactions API 注册一个协调者到 transactional.id,从这一点看,协调者将关闭所有与该事务相关的挂起事务,以隔离僵尸操作,这种情况一般在⽣生产者建立会话时发⽣生一次。
2.当生产者第一次发送数据到一个分区时,该分区首先向协调器注册
3.当应用程序启动 commitTransaction 和 abortTransaction 时,将向协调器发送一个请求,以启动两阶段提交协议。
B:协调器和事务日志交互
随着事务的进展,生产者发送上述请求以更新协调器上的状态,事务协调器保存它在内存中的每个事务的状态,并将该状态写入到事务日志中(它复制了三种⽅方式,所以是持久的) 事务协调器是从事务⽇日志中进⾏行读写的唯一组件,如果某个给定的broker失败了,一个新的协调器被选为事务日志分区的新的leader,该日志分区时已死的broker所有的,它从传入的分区读取消 息,重新构建其内存状态,以便在这些分区中进行事务处理
C:生产者将数据写入目标分区
在与协调器的事务中注册新的分区之后,生产者将数据以正常方式发送到实际分区中,这个是相同的 producer.send 流,但是需要通过一些验证来确保生产者不会受到保护
D:协调器与topic-partitionj交互
在生产者发起提交(或终止)之后,协调者开始分两阶段提交protocol 第一阶段,协调器将其内部状态更新为“prepare_commit“,并在事务日志中更新这个状态,一旦完成,事务将保证无论如何都会提交 然后,协调器开始第二阶段,它将事务提交标记写入到事务的局部分区中 这些事务标记不会暴露在应⽤用程序中,但是在read_committed模式中,消费者会使⽤用这些标记来过滤掉中止的事务的消息,而不返回属于open事务的部分消息(哪些在⽇日志中并没有与他们进⾏行关 联的事务标记) 标记完成后,事务协调器将事务标记为“complete” 并且⽣生产者可以启动另一个事务.
事务在实践中的应用
我们已经了解了事务的语义以及它们是如何工作的,我们将注意⼒力转移到编写应⽤用程序的实战方面。 如何选择transactional.id
在僵尸隔离过程中, transactional.id 扮演着重要的角色,但是保持一个标识在生产者会话中一致,并且正确的将僵尸隔离是有些棘手的 正确隔离僵尸的关键是确保在读写过程中输⼊入的主题和分区对于对定的事务总是有效的,如果不是有效的,⼀一些消息可能会通过事务提供的隔离泄漏
例如:在一个分布式流处理程序中,假设topic-partition tp0最初是由transactional.id 处理的T0,这样某种程度后,它可以被映射到另一个具有生产者的transactional.id T1 ,T0和T1之间不存在 fencing,因此,来自tp0的消息可能会被重新处理,这违背了exactly once的消息保证,要么在外部存储中存储输入分区和 transactional.ids的映射,或者有一些静态编码,kafka streaming选择了后 一种方法来解决这个问题。
事务是如何执行的,如何进行调优
事务生成器的性能:
我们把注意点转到事务的生成上
首先,事务只适合一般的写操作,大量的写由于:
1.对于每个事务,我们已经有了RPCs来将分区注册到协调器,这些批处理的,所以我们的RPC要⽐比事务中的要少
2.当完成一个事务的时候,必须将一个事务标记写入到每个参与事务的分区中,同样,事务协调器将所有标记绑定到同一个RPC中的同一个代理,所以我们在那里保存了RPC的开销,但是我们不能 避免在事务中对每个分区进行额外的写入
3.最后,我们将状态修改并写入到事务日志,这包括每批添加到事务的分区“prepare_commit”状态和“complete_commit”状态的写入 正如我们看到的,写入消息的开销作为事务独立的一部分
因此,提高吞吐量的关键在于每个事务包含更多的消息 实际上,在最大吞吐量producer⽣生产1KB记录,每100ms提交消息会导致吞吐量下降30%,较小的消息或者更短的事务提交间隔将导致更小的降级 增加事务持续时间的主要初衷是为了增加端到端的延迟,回想一下,读取事务性消息的消费者不会传递属于公开事务的消息,因此,提交时间间隔越长,耗时较长的应⽤用程序将不得不等,从而 导致端到端延迟
事务消费者的性能
事务消费者⽣产者更简单,只需要做:
1.过滤掉属于事务中止的消息
2.它是开放事务的一部分,不返回事务消息 因此,当在read_committed模式读取事务消息时,事务消费者不会显示吞吐量降低,这其中的主要原因是我们在读取事务性消息的时候是零拷贝读取 此外,消费者不需要任何缓冲来等待事务完成,相反,broker不允许它提前去包含打开事务的offset中去。 因此,消费者是及其轻量级和⾼高效的,感兴趣的读者可以到本⽂文档中了解消费者设计的细节 .
总结
在本文中,我们了解了Apache Kafka中事务API的主要设计目标,我们了解了事务API的语义,并对API的实际工作方式有了更深的了解。 如果我们考虑整个过程的read-process-write,这篇⽂文章主要邯郸了读和写的路径,处理过程本⾝身就是一个黑盒子,实际上在处理阶段要做很多的事情,所以在处理的中不可能只使⽤用事务APIs,例 如,如果处理对其他存储系统有副作用,那么这⾥所使用的APIs并不是exactly once的 Kafka Streams框架使用这里描述的事务APIs向上移动价值链,并为各种应⽤用程序提供exactly once处理,即便是那些在处理过程中更新某些额外的存储状态 将来博客会介绍kafka streaming是如何进行exactly once语义处理的,以及如何编写应⽤用程序 最后,如果想了解api上层实现细节,我们后续会有另一篇博客⽂文章,它涵盖了在这里描述的事务api中一些更有趣的解决方案。
扩展阅读
我们刚刚接触了Apache Kafka中事务表⾯面,不过,关于它的所有设计细节都可以在网上查到,相关文档如下:
1、Kafka KIP的最初设计:它提供了关于数据流的详细信息,以及很好的描述了公共接口,特别是与事务相关的配置选项
2、最初的设计文档: 这是除了源代码之后,最准确的信息,可以学习⼀一下关于如何处理每个事务RPC,如何维护事务日志,如何清洗事务性数据等。
3、KafkaProducer Javadocs:这是如何学习新的APIs的好地方,其中的实例以及send方法的⽂文档都是很好的开端
本⽂文翻译⾃自:https://www.confluent.io/blog/transactions-apache-kafka/
猜你喜欢
加入技术讨论群
为了方便大家相互交流学习,社区创建微信/QQ群,社区群人数已经2500+,欢迎大家加下面助手微信,拉大家进群,自由交流。

喜欢QQ群的,可以扫描下面二维码:

欢迎大家通过二维码打赏支持技术社区(二维码累计打赏36+,打赏英雄请留名,社区感谢您):





