暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka的安装和关键知识点的简单介绍

Java技术学习笔记 2020-11-06
607

图解

1.broker、partition的关系

2.partition中leader和follower

3.HW&LEO(LEO(last end offset)日志末端偏移量HW(highwatermark),高水印值

Kafka的HW,LEO更新原理及运行流程总结

https://juejin.im/post/6844904008189116424

4.消息的格式

5.LogSegment文件命名的规则和日志

6.零拷贝

7.保证消息不丢失

关键词

Broker

一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic,各Broker之间地位平等。

Topic:

每个message属于一个Topic,收到的message进行load balance,均匀的分布在这个topic下的不同的partition上

Partition:

分片,分为leader和follower,一个topic中包含多个Partition,物理上是一个文件夹;1个partition只能被同组的一个consumer消费;分区内的消息具有顺序性,分区间的数据无序性;

Replica:

副本,Partition含有N个Replica,其中一个replica为leader,其他为follower;follower定期去同步leader数据;

Message:

消息,是通信的基本单位;消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的;

Offset:

消息在Topic的Partiton中的位置,同一个Partition中的消息随着消息的写入,其对应的Offset也自增

LogSegment:

日志段,一个日志又被划分为多个日志段,一个日志段对应一个具体的日志文件和两个索引文件(日志文件以“.log”为后缀,两个索引文件分别以“.index”和“.timeindex”),表示消息偏移量索引文件和消息时间戳索引文件

ISR:

保存同步的副本列表。Kafka在zookeeper中动态维护了一个ISR(In-sync-Replica)即保存同步的副本列表,保存的是与Leader副本保持消息同步的所有副本对应的代理节点id。如果一个Follower副本宕机或者落后太多,则该Follower副本节点将从ISR列表中移除。

Controller

Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。

如果当前的Controller失败,会从其他正常的Broker中重新选举Controller。

Producer

消息生产者

Consumer

消息消费者,每个Consumer属于一个Consumer Group;每个Consumer Group中可以包含多个Consumer;一个Partiotn只会被Consumer Group中一个Consumer消费

安装步骤

1.zookeeper安装

# 下载连接 
# https://zookeeper.apache.org/releases.html#download


wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
mv apache-zookeeper-3.6.2-bin /usr/local
cd /usr/local
mv apache-zookeeper-3.6.2-bin zookeeper3.6.2
cd zookeeper3.6.2/conf
mv zoo_sample.cfg zoo.cfg
# 启动
./bin/zkServer.sh start

2.kafka安装

# 下载连接
# http://kafka.apache.org/downloads


wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar -zxvf kafka_2.12-2.6.0.tg
cd kafka_2.12-2.6.0
mv kafka_2.12-2.6.0 /usr/local
cd /usr/local/kafka_2.12-2.6.0/conf
vim server.properties


##
zoopeeker.connect=127.0.0.1:3000,127.0.0.1:3001

3.Scala安装

# 下载链接
# https://www.scala-lang.org/download/2.12.12.html


# https://downloads.lightbend.com/scala/2.12.12/scala-2.12.12.tgz


wget https://downloads.lightbend.com/scala/2.12.12/scala-2.12.12.tgz
tar -zxvf scala-2.12.12.tgz
mv scala-2.12.12 /usr/local
cd /usr/local


# 环境变量设置
vi /etc/profile
# 修改
export SCALA_HOME=/usr/local/scala
export PATH=${SCALA_HOME}/bin:$PATH


# 环境变量生效操作
source /etc/profile

知识点介绍

1.有Leader和follower,为什么Kafka不支持读写分离?

生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,是主写主读的生产消费模型
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间,这段时间会导致主从节点之间的数据不一致。
(2)延时问题。主从同步数据需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间

2.怎么保证kafka数据不丢失

1.生产者
request.required.acks=-1 (leaderfollower都确认成功之后才返回成功)
retries=Integer.MAX_VALUE (发送失败重试的次数,捕获异常)


2.消费者
enable.auto.commit=false,手动提交


3.brocker
replication.factor >= 2  (副本数)
min.insync.replicas = 2  (ISR中副本数)
unclean.leader.election.enable = false (下面有介绍)

3.kafka为什么这么快?

(1)Cache Filesystem Cache PageCache缓存
2)顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
3)Zero-copy 零拷⻉,少了一次内存交换。
4)Batching of Messages 批量量处理。合并小的请求,然后以流的方式进行交互,直顶网络上限。
5)Pull 拉模式 使用拉模式进行消息的获取消费,与消费端处理能力相符。

4.如果leader crash时,ISR为空怎么办

kafka在Broker端提供了一个配置参数:unclean.leader.election.enable,这个参数有两个值:
true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。
false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。

5.什么情况下一个 broker 会从 isr中踢出去

leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica)
每个Partition都会有一个ISR,而且是由leader动态维护
如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求
则leader将其重ISR中移除

核心参数

1.producer

1.常见异常处理
(1)retries:重新发送数据的次数(默认为0,表示不重试)
(2)retry.backoff.ms:两次重试之间的时间间隔(默认为100ms)


2.提升消息吞吐量
buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB
compression.type: 用于压缩数据的压缩类型。默认是none表示无压缩。可以指定gzipsnappy
batch.size: 批处理消息记录,以减少请求次数(默认是16384Bytes,即16kB,满了16kB就发送出去)
linger.ms: 消息延迟的最大时间(默认是0,即立即发出)
一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kB,自然就会发送出去。
但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。


3.请求超时
max.request.size:控制发送出去的消息的大小,默认是1048576字节,也就1mb,一般设成10mb
request.timeout.ms:发送消息超时的时间限制,默认是30秒,超过会报TimeoutException


4.ACK参数
request.required.acks:消息的持久化机制
(1)=0:不管消息是否写入成功到broker中,直接发送下一条数据到batch中,数据丢失的风险最高,但吞吐量最大
(2)=1: 只要leader写入成功,就认为消息成功了.如果刚写入leaderleader就挂了,此时数据必然丢了
(3)=-1/allleader写入成功 + follower写入成功。数据安全,性能最差

2.consumer

heartbeat.interval.ms  consumer心跳时间 (默认值:3000)
session.timeout.ms kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒
max.poll.interval.ms 如果在两次poll操作之间,超过了这个时间,会被踢出消费组
fetch.max.bytes 获取一条消息最大的字节数 (默认值:1048576)
max.poll.records  一次poll返回消息的最大条数(默认值:500条)  
connections.max.idle.ms  consumerbrokersocket建立连接的超时时间,建议设置成-1 不超时(默认值:540000)
auto.offset.reset
  earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费
none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
注:我们生产里面一般设置的是latest
enable.auto.commit  设置为自动提交offset (默认值:true)
auto.commit.interval.ms 每隔多久更新一下偏移量 (默认值:60 * 1000)

Kafka常用topic操作命令汇总

Kafka常用topic操作命令汇总

行无际 https://www.cnblogs.com/itwild/p/12287850.html
offset相关
# 最大offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic --time -1
# 最小offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic --time -2
# offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic


topic相关
# 列出当前kafka所有的topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 1 --partitions 1 
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test_topic --replication-factor 3 --partitions 10 --config cleanup.policy=compact
bin/kafka-topics.sh --create --zookeeper localhost:2181  --topic test_topic --partitions 1   --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
# 查看某topic具体情况
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_topic
# 修改topic(分区数、特殊配置如compact属性、数据保留时间等)
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 3  --config cleanup.policy=compact --topic test_topic
# 修改topic(也可以用这种)
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --add-config cleanup.policy=compact
bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name test_topic --entity-type topics --delete-config cleanup.policy


consumer-group相关
# 查看某消费组(consumer_group)具体消费情况(活跃的消费者以及lag情况等等)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group  --describe
# 列出当前所有的消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 旧版
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group test_group --describe


consumer相关
# 消费数据(从latest消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
# 消费数据(从头开始消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning
# 消费数据(最多消费多少条就自动退出消费)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --max-messages 1
# 消费数据(同时把key打印出来)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --property print.key=true
# 旧版
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic


producer相关
# 生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
# 生产数据(写入带有key的message)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

错误处理

1. Uninitialized object exists on backward branch 50

# jdk版本不匹配,需要更新版本,现在一般用openjdk


#命令如下:
yum install java-1.8.0-openjdk


# 执行完后,验证:
java -version


# openjdk version "1.8.0_262"
# OpenJDK Runtime Environment (build 1.8.0_262-b10)
# OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode)


# 上述操作还不能解决问题的话,最好重启哈linux
文章转载自Java技术学习笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论