
一、Kafka安装
1.1、Linux下安装
- 下载安装包
https://kafka.apache.org/downloads
- 解压
tar -zxvf kafka_2.12-3.0.1.tgz
- 启动Zookeeper
cd kafka_2.12-3.0.1
bin/zookeeper-server-start.sh config/zookeeper.properties
- 修改kafka配置
vi config/server.properties
# 去掉这一行的注释
listeners=PLAINTEXT://:9092
# 去掉这一行的注释,并把换掉ip(需要外网访问的ip使用外网ip)
advertised.listeners=PLAINTEXT://localhost:9092
- 启动kafka
bin/kafka-server-start.sh config/server.properties
-
创建topic
-
旧版本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test -
新版本
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
-
-
启动生产者
bin/kafka-console-producer.sh --topic test --broker-list localhost:9092
- 启动消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
- 测试
生产者发送消息

订阅的消费者接收消息

1.2、Docker安装
-
安装zookeeper
kafka需要zookeeper管理,所以需要先安装zookeeper。
# 拉取镜像 docker pull zookeeper:3.6 # 启动容器 docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper:3.6 -
安装kafka
# 拉取镜像 docker pull wurstmeister/kafka:2.12-2.5.0 # 启动kafka docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9002:9002 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9002 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9002 -v /etc/localtime:/etc/localtime wurstmeister/kafka:2.12-2.5.0【参数说明】 -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己 -e KAFKA_ZOOKEEPER_CONNECT=192.168.124.28:2181/kafka 配置zookeeper管理kafka的路径192.168.124.28:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.124.28:9002 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9002 配置kafka的监听端口 -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
二、SpringBoot集成简单测试
- 依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 测试使用
@RestController
public class KafkaTest {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
/**
* 发送消息到mq
* @param topic topic
* @param message 消息内容
*/
@GetMapping("/test")
public void testSendMessage(String topic, Object message){
kafkaTemplate.send(topic, message);
}
}
以上,调用这个接口,即可发送消息到kafka中
最后修改时间:2023-05-25 10:37:10
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




