暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【赵渝强老师】Kafka生产者的消息发送方式

原创 赵渝强老师 2025-10-29
34

1.png
​Kafka生产者有三种方式进行消息的发送,这三种方式区别在于对于消息是否正常到达的处理。

aaa11.png
点击这里查看视频讲解:【赵渝强老师】Kafka生产者的消息发送方式


下面分别介绍生产者的这三种消息发送方式。

第一种:fire-and-forget

该方式把消息发送给Kafka的Broker之后不关心其是否正常到达。在大多数情况下消息会正常到达,即使出错了生产者也会自动重试。但这种方式可能造成Kafka Broker没有接收到生产者的消息。因此这种方式适用于允许消息的丢失、并对吞吐量要求大的场景,比如用户点击日志上传。代码如下:

import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 发送方式:fire-and-forget Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i)); Thread.sleep(1000); } producer.close(); } }

第二种:同步发送

生产者使用send方法发送一条消息,该方法会返回一个Future对象。调用该对象的get方法可以阻塞当前线程并等待返回。这种方式适用对消息可靠性要求高的场景,比如支付的场景。在这种场景下要求消息不可丢失,如果丢失了将回滚相关的业务操作。代码如下:

import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 发送方式:同步发送 Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) { RecordMetadata metadata = producer.send(new ProducerRecord<String, String> ("mytopic1", "key" + i, "value" + i)).get(); System.out.println("同步消息发送成功:" + i); } producer.close(); } }

第三种:异步发送

生产者使用send方法发送一条消息时指定回调函数,在Kafka Broker返回结果时调用。这个回调函数可以进行错误日志的记录或者重试。这种方式牺牲了一部分可靠性,但是吞吐量会比同步发送高很多。代码如下:

import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class KafkaProducerDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 发送方式:异步发送 Producer<String, String> producer = new KafkaProducer(props); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("mytopic1", "key" + i, "value" + i),new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 回调函数 if(exception != null) { System.out.println("消息异步发送出现错误!!!"); exception.printStackTrace(); } else { System.out.println("消息异步发送成功。 " + "Topic: " + metadata.topic() + ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset()); } } }); } producer.close(); } }
最后修改时间:2025-12-14 21:22:29
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论