暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka使用教程

Java有货 2019-07-27
935

文章目录

    • 一、Kafka是什么?

    • 二、ApacheKafka®是一个分布式流媒体平台。这到底是什么意思呢?

      • 2.1流媒体平台有三个关键功能:

      • 2.2Kafka通常用于两大类应用:

    • 三、Kafka安装

      • 3.1下载kafka

      • 3.2安装配置

        • wind 配置修改

      • 3.3启动

      • 3.4注:

    • 四、安装zookeeper

      • 4.1.下载

      • 4.2.安装配置

        • wind 配置修改

      • 4.3.启动

    • 五、集成

      • 5.1依赖管理

      • 5.2 项目配置

      • 5.3 生产与消费

      • 5.4项目结构


一、Kafka是什么?

Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。
它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

二、ApacheKafka®是一个分布式流媒体平台。这到底是什么意思呢?

2.1流媒体平台有三个关键功能:

发布和订阅记录流,类似于消息队列或企业消息传递系统。
以容错的持久方式存储记录流。
记录发生时处理流。

2.2Kafka通常用于两大类应用:

构建可在系统或应用程序之间可靠获取数据的实时流数据管道
构建转换或响应数据流的实时流应用程序。

更多信息大家可以对照官网介绍:http://kafka.apache.org/intro.html

三、Kafka安装

3.1下载kafka

http://kafka.apache.org/downloads

3.2安装配置

wind 配置修改

2.1 kafka安装路径/config/server.properties,讲以下内容进行替换
2.2 listeners=PLAINTEXT://:9092
2.3 advertised.listeners=PLAINTEXT://localhost:9092

3.3启动

.\bin\windows\kafka-server-start.bat .\config\server.properties

3.4注:

可以使用kafka客户端进行监控,下载地址如下
http://www.kafkatool.com/download.html
由于kafka需要ZK作为注册中心接下再来安装ZK

四、安装zookeeper

4.1.下载

http://zookeeper.apache.org/releases.html

4.2.安装配置

wind 配置修改

2.1 将conf下“zoo_sample.cfg”重命名为“zoo.cfg”,这里可以将log地址改成自定义位置

4.3.启动

打开新的cmd,输入zkServer,运行Zookeeper

五、集成

5.1依赖管理

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.javayh</groupId>
<artifactId>javayh-mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../</relativePath> <!-- lookup parent from repository -->
</parent>
<groupId>com.javayh</groupId>
<artifactId>javayh-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>javayh-kafka</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring Cloud Kafka-->
<!-- <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>-->
<dependency>
<groupId>com.javayh</groupId>
<artifactId>javayh-commons</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>


5.2 项目配置

server:
port: 8023
spring:
application:
name: javayh-kafka
kafka:
bootstrap-servers: localhost:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.javayh.serializable.SysMenuDeserializer
auto-offset-reset: latest #最早未被消费的offset earliest
max-poll-records: 3100 #批量消费一次最大拉取的数据量
enable-auto-commit: false #是否开启自动提交
auto-commit-interval: 1000 #自动提交的间隔时间
session-timeout: 20000 #连接超时时间
max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
max-partition-fetch-bytes: 15728640 #设置拉取数据的大小,15M
producer:
bootstrap-servers: localhost:9092
batch-size: 16785 # 一次最多发送的数据量
retries: 1 #发送失败后的重复发送次数
buffer-memory: 33554432 #32M批处理缓冲区
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.javayh.serializable.SysMenuSerializable
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/guns?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true&useAffectedRows=true
username: root
password: 1219320
druid:
initialSize: 5 #初始化连接大小
minIdle: 5 #最小连接池数量
maxActive: 20 #最大连接池数量
maxWait: 60000 #获取连接时最大等待时间,单位毫秒
timeBetweenEvictionRunsMillis: 60000 #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
minEvictableIdleTimeMillis: 300000 #配置一个连接在池中最小生存的时间,单位是毫秒
validationQuery: SELECT 1 from DUAL #测试连接
testWhileIdle: true #申请连接的时候检测,建议配置为true,不影响性能,并且保证安全性
testOnBorrow: false #获取连接时执行检测,建议关闭,影响性能
testOnReturn: false #归还连接时执行检测,建议关闭,影响性能
poolPreparedStatements: false #是否开启PSCache,PSCache对支持游标的数据库性能提升巨大,oracle建议开启,mysql下建议关闭
maxPoolPreparedStatementPerConnectionSize: 20 #开启poolPreparedStatements后生效
filters: stat,wall,log4j #配置扩展插件,常用的插件有=>stat:监控统计 log4j:日志 wall:防御sql注入
connectionProperties: 'druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000' #通过connectProperties属性来打开mergeSql功能;慢SQL记录

kafka:
topic:
group-id: javayh-kafka
topic-name:
- JavaYoHo
- YangHaiJi
- Dylan

这里大家可以看到Kafka的序列化问题,如何你想生产消费对象,需要自定义序列化机制,这里和RabbitMQ还是有区别的,关于RabbitMQ的使用与介绍大家可以看一下:https://blog.csdn.net/weixin_38937840/article/details/96591029这篇文章

5.3 生产与消费

@Slf4j
@Service
public class KafkaService {

@Autowired
private KafkaTemplate kafkaTemplate;

/**
* 生产者
* @param data
*/
public void send(SysMenu data){
ListenableFuture send = kafkaTemplate.send(StaticNumber.JAVAYOHO, data);
send.addCallback(new ListenableFutureCallback(){
@Override
public void onSuccess(Object result) {
log.info("send success");
}
@Override
public void onFailure(Throwable ex) {
log.info("send failure");
}
});
log.info("send success");
}

// public void sendString(String data){
// ListenableFuture send = kafkaTemplate.send(StaticNumber.YANGHJ, data);
// send.addCallback(new ListenableFutureCallback(){
// @Override
// public void onSuccess(Object result) {
// log.info("send success");
// }
// @Override
// public void onFailure(Throwable ex) {
// log.info("send failure");
// }
// });
// log.info("send success");
// }
}

----------

/**
* 消费者
* @param record 消息
*/
@Transactional
@KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}" )
public void processMessage(ConsumerRecord<String, SysMenu> record) {
SysMenu sysMenu = record.value();
log.info("SysMenu --> {}",sysMenu.toString());
if (sysMenu == null) {
throw new RuntimeException("模拟业务出错");
}
log.info("kafka processMessage start, topic = {}, msg = {}", record.topic(), sysMenu);
//入库
Integer i = sysMeunMapper.modifyById(sysMenu);
log.info("更新 {}",i);
if (i == null || i == 0){
log.info("kafka processMessage end failure");
}else {
log.info("kafka processMessage end success");
}
}

5.4项目结构


由于内容较多,就不一一复制了,大家可以去我的github上看源码:
https://github.com/Dylan-haiji/javayh-cloud/tree/master/javayh-mq/javayh-kafka
到这里Springboot整合Kafka实现简单的生产消费就完成了!

小编寄语


小编创建了一个关于Java学习讨论的微信群!想进去的可以联系小编!同时也欢迎大家点赞与转发!

小编微信:372787553

备注为进群,通过后小编会邀请您进群!


Ja
va


文章转载自Java有货,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论