|
Hostname |
IP |
Cpu© |
Memory(G) |
Disk(G) |
角色 |
|
SrKafka001 |
10.132.35.3 |
16 |
64 |
1024 |
Broker、Controll |
|
SrKafka002 |
10.132.34.255 |
16 |
64 |
1024 |
Broker、Controll |
|
SrKafka003 |
10.132.35.2 |
16 |
64 |
1024 |
Broker、Controll |
|
SrKafka004 |
10.132.35.1 |
16 |
64 |
1024 |
Broker |
|
SrKafka005 |
10.132.35.0 |
16 |
64 |
1024 |
Broker |
软件配置
kafka_2.13-3.3.2.tgz
部署实施
创建目录(所有kafka节点机器):
#mkdir /usr/local/kakfa kafka安装目录
#mkddir /usr/local/java java 安装目录(java
version "1.8.0_351")
#mkdir /data/kafkalogs kafka日志目录
解压软件(所有kafka 节点机器都需要安装)
我准备得kafka_2.13-3.3.2.tgz 软件包统一放在/opt/soft 目录下
# cd /opt/soft/
#tar -xvf kafka_2.13-3.3.2.tgz
# cd kafka_2.13-3.3.2
# mv * /usr/local/kafka/
修改配置文件
因为我们是kraft 得方式部署已经不依赖zookeeper 所以修改得servier.properties 在 kraft 目录下
#cd /usr/local/kafka/config/
#ls
这里不用理会kraft 文件上一层得各类文件
#cd /usr/local/kafka/config/kraft/
修改
#vim server.properties
需要修改得参数:
process.roles=broker,controller
controller.quorum.voters=1@10.132.35.3:9093,2@10.132.34.255:9093,3@10.132.35.2:9093
listeners=PLAINTEXT://10.132.34.255:9092,CONTROLLER://10.132.34.255:9093
advertised.listeners=PLAINTEXT://10.132.34.255:9092
log.dirs=/data/kafkalogs
其中Process.Roles
每个Kafka服务器现在都有一个新的配置项,叫做Process.Roles, 这个参数可以有以下值:
如果Process.Roles = Broker, 服务器在KRaft模式中充当 Broker。
如果Process.Roles = Controller, 服务器在KRaft模式下充当 Controller。
如果Process.Roles = Broker,Controller,服务器在KRaft模式中同时充当 Broker 和Controller。
如果process.roles 没有设置。那么集群就假定是运行在ZooKeeper模式下。
在Kafka的Kraft模式中,Broker和Controller的角色和职责与传统的Kafka集群模式有一些相似之处,但也存在一些特定的差异。
Broker:
角色:Broker仍然是Kafka集群中的一个独立服务器,负责处理来自生产者的消息并将其存储在集群中,同时也负责将消息发送给消费者。
功能:在Kraft模式中,Broker不仅维护消息队列,还负责参与集群的共识过程,确保消息的可靠性和一致性。它与其他Broker协同工作,通过共识算法(如Raft)来选举领导者并达成共识,从而确保集群的稳定性和容错性。
Controller:
角色:在Kraft模式中,Controller的角色仍然扮演着集群管理和协调的重要角色。然而,与传统的Kafka集群模式不同,Kraft模式中的Controller不再完全依赖于Zookeeper来进行集群元数据的管理。
功能:Controller负责管理和维护集群的元数据,包括分区信息、Broker状态等。它与其他Broker通信,收集状态信息,并根据需要执行相应的管理操作,如分区重新分配、Broker状态更新等。由于Kraft模式采用了基于Raft的共识算法,Controller的选举和元数据的一致性也得到了保证。
区别:
依赖关系:在传统的Kafka集群模式中,Controller通常依赖于Zookeeper来进行集群的管理和协调。而在Kraft模式中,由于采用了基于Raft的共识算法,Controller不再完全依赖于Zookeeper,而是通过与其他Broker的协同工作来达成共识和进行集群管理。
共识机制:在Kraft模式中,Broker和Controller都参与了基于Raft的共识过程。这意味着它们不仅交换状态信息,还通过投票和日志复制等机制来确保集群的一致性和容错性。这种共识机制使得Kraft模式在处理集群故障和恢复时更加健壮和可靠。
总结来说,在Kafka的Kraft模式中,Broker和Controller仍然扮演着各自的角色,但它们的职责和依赖关系与传统的Kafka集群模式有所不同。Broker参与共识过程,而Controller则通过与其他Broker的协同工作来管理和维护集群的元数据,确保集群的稳定性和一致性。
其中node.id
#节点ID,每个节点需要保持唯一
其中controller.quorum.voters
系统中的所有节点都必须设置 `controller.quorum.voters` 配置。这个配置标识有哪些节点是 Quorum 的投票者节点。所有想成为控制器的节点都需要包含在这个配置里面。
我们有五台机器
其中SrKafka001、SrKafka002、SrKafka003 即承担broker角色也承担controller 这个角色设置体现在参数process.roles 里面指定 SrKafka004、SrKafka005 做broker 角色 那么process.roles=broker即可。
controller.quorum.voters配置格式为 node.id@ip:port多个用,隔开,每台机器都一样。
其中listeners
listeners参数用于指定Kafka Broker监听的网络接口和端口,它定义了Kafka Broker绑定的IP地址或主机名以及所使用的协议和端口号。这个参数是Kafka Broker自己使用的,用于监听来自客户端(生产者和消费者)的连接请求。
advertised.listeners参数是一个可选参数,用于指定客户端连接到Kafka Broker的网络接口和端口。当Kafka Broker处于一个网络中,但客户端在另一个网络中时,客户端可能无法直接连接到Kafka Broker所在的网络接口和端口。在这种情况下,Kafka Broker可以使用advertised.listeners参数来提供一个公共的网络接口和端口,以便客户端能够连接。
advertised.listeners参数的值可以是与listeners参数相同的值,也可以是一个经过转换的值。转换的方式通常是将内部的IP地址或主机名替换为外部可访问的IP地址或主机名。这样,客户端可以使用advertised.listeners参数指定的网络接口和端口来连接到Kafka Broker,即使它们无法直接访问Kafka Broker所在的网络接口和端口。
总结:
- listeners参数是Kafka Broker自己使用的,用于监听客户端的连接请求。
- advertised.listeners参数是可选的,用于指定客户端连接到Kafka Broker的网络接口和端口。它提供了一个公共的网络接口和端口,以便客户端能够连接到Kafka Broker,即使它们无法直接访问Kafka Broker所在的网络接口和端口。
如果节点角色即是broker
又是 controller 那么 listeners 设置为listeners=PLAINTEXT://10.132.34.255:9092,CONTROLLER://10.132.34.255:9093
如果仅为broker
或者 controller 那么只要设置对应得其中一个即可。
配置分发:
将server.properties 文件分发到其余节点 并记得检查node.id 、log.dirs、listeners、Process.Roles、advertised.listeners 参数。
启动Kafka Kraft 集群
运行KRaft集群,主要分为三步:
用kafka-storage.sh 生成一个唯一的集群ID(只需要在一个节点生产即可)
#cd /usr/local/kakfa/bin
用kafka-storage.sh 格式化存储数据的目录(每个节点都需要执行)
#cd /usr/local/kakfa/bin
#./kafka-storage.sh
format -t pXiK7pZTQaa5l8juVFB9kw -c /usr/local/kafka/config/kraft/server.properties
红色代码为 生成得UUID
用bin/kafka-server-start.sh 启动Kafka
Server
运维集群
查看集群:
./kafka-broker-api-versions.sh
--bootstrap-server 10.132.35.3:9092
创建集群
#./bin/kafka-topics.sh
--create --topic kafkaraftTest --partitions 1 --replication-factor 1
--bootstrap-server 10.132.35.3:9092
查看集群topic
查看单个topic
得信息
./kafka-topics.sh
--bootstrap-server 10.132.35.3:9092 --describe --topic geely-json-topic
到此 我们已经部署好了 kafka kraft 集群
但是是最基础得集群并没有做安全和优化配置。
使用Mirror Maker2 复制生成集群
Kafka 集群部署完成后,接下来我们就可以想办法复制生产集群数据了。对于kakfa 集群得数据复制可以通过
1、自己写程序 消费kafka 再吐入kafka 的方式
2、通过kafka 自带的工具Mirror Maker2 复制集群的方式
这里我们通过第二张方式,因为大部分运维人员没有代码能力。
源kafka 集群:
Broker
10.132.34.206:6667,10.132.34.207:6667,10.132.34.208:6667,10.132.34.209:6667,10.132.34.210:6667
目标集群:就是我们搭建的集群
Broker
10.132.34.255:9092,10.132.35.1:9092,10.132.35.2:9092,10.132.35.3:9092,10.132.35.0:9092
源集群已集成kerberos
第一步: 新建的集群安装(所有节点安装)
#yum install
krb5-workstation krb5-libs krb5-auth-dialog -y
问你的管理员拿到 krb5.conf
文件 、keytab、kafka_jass.conf 文件、
第二步:同步配置文件
把krb5.conf 文件 放到所有节点/etc/ 目录下
要先做第一步 如果不做第一步直接放 再yum 安装的时候会自动产生krb5.conf 文件 覆盖掉。
把keytab文件 放到所有节点 kafka_jass.conf
文件指定的目录下
我这里是/etc/security/keytabs/
把kafka_jass.conf 文件放到所有节点/usr/local/kakfa/config 文件目录下 (也可以放到其他地方 这里放这里是利于管理)
第三步:修改hosts文件
把安装kdc的主机加入到 /etc/hosts
里面
第四步:验证kerberos
kinit
-kt /etc/security/keytabs/kafka.service.keytab
kafka/vel-kafka34206.gdmp.com@GDMP.COM
第五步:修改同步--consumer.config 和--producer.config
文件(同步到其他节点)
修改--consumer.config 和--producer.config 文件
Consumer.properties
这个文件添加两个参数
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
bootstrap.servers=10.132.34.206:6667,10.132.34.207:6667,10.132.34.208:6667,10.132.34.209:6667,10.132.34.210:6667
Producer.properites
这个文件 指定bootstrap.servers=10.132.35.3:9092,10.132.34.255:9092,10.132.35.0:9092,10.132.35.1:9092,10.132.35.2:9092
新建的集群没有安全限制 不用配置其他安全
第六步:启动复制
#/usr/local/kafka/bin
#nohup ./kafka-mirror-maker.sh --consumer.config
/usr/local/kafka/config/consumer.properties --num.streams 20 --producer.config
/usr/local/kafka/config/producer.properties
--whitelist="geely-json-topic"
&
至此使用mirror make2 复制已经配置完成
可通过 运维集群章节命令查看集群。
通过mirror makeer 复制的kafka 数据
分区不均衡
使用MirrorMaker复制Kafka数据时,可能会出现分区不均衡的情况。这可能是由于以下几个原因导致的:
1.
消费者组配置不正确:在MirrorMaker的配置文件中,如果消费者组的数量大于目标集群的分区数量,则会导致分区不均衡。确保消费者组的数量与目标集群的分区数量相匹配。
2.
目标集群的分区数量不足:如果目标集群的分区数量少于源集群的分区数量,则无法实现完全的均衡复制。在创建目标集群时,确保分区数量与源集群相匹配。
3.
消费者组的偏移量不同步:如果源集群和目标集群的消费者组偏移量不同步,则可能导致分区不均衡。你可以使用工具类kafka.tools.GetOffsetShell来检查消费者组的偏移量,并确保源集群和目标集群的偏移量保持一致。
4.
网络问题:如果网络延迟或带宽限制较大,则可能导致MirrorMaker无法及时复制数据,从而导致分区不均衡。确保网络连接稳定,并检查带宽是否足够支持复制操作。
为了解决分区不均衡的问题,你可以尝试以下方法:
1.
调整消费者组的数量:根据目标集群的分区数量,适当调整消费者组的数量,以实现分区的均衡复制。
2.
手动重新分配分区:如果分区不均衡的程度较大,你可以手动重新分配分区,将不均衡的分区重新分配给其他消费者组,以实现分区的均衡复制。
3.
检查消费者组的偏移量:使用kafka.tools.GetOffsetShell工具类检查源集群和目标集群的消费者组偏移量是否一致,如果不一致,则可以通过手动调整偏移量来解决。
4.
检查网络连接和带宽:确保网络连接稳定,并检查带宽是否足够支持复制操作。
请注意,MirrorMaker是一个异步复制工具,不能保证实时和完全一致的数据复制。因此,在使用MirrorMaker进行数据复制时,分区不均衡是常见的情况,特别是在高负载或高延迟的情况下。如果需要实时和完全一致的数据复制,可以考虑使用其他工具或方法,如Kafka Streams或Kafka Connect。
如何手动平衡kafka 分区数据
要手动平衡Kafka分区数据,可以按照以下步骤进行操作:
- 查看当前分区分布:使用Kafka自带的命令行工具kafka-topics.sh来查看当前主题的分区分布情况。例如,运行以下命令来查看名为"my_topic"的主题的分区分布:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my_topic
复制
copied!
- 计算分区的平均负载:根据当前的分区分布情况,计算每个消费者组或消费者实例应处理的平均分区数量。例如,如果有3个消费者组,而主题有9个分区,则每个消费者组应处理3个分区。
- 重新分配分区:根据计算出的平均负载,手动重新分配分区。可以使用kafka-reassign-partitions.sh工具来执行分区重新分配。首先,创建一个JSON文件,指定要重新分配的主题和分区分配方案。例如,创建一个名为"reassignment.json"的文件,其内容如下:
{
"version": 1,
"partitions": [
{
"topic": "my_topic",
"partition": 0,
"replicas": [1, 2, 3] // 新的分区副本分配方案
},
{
"topic": "my_topic",
"partition": 1,
"replicas": [2, 3, 1]
},
// 其他分区...
]
}
复制
copied!
在上面的示例中,为"my_topic"的第一个分区指定了新的分区副本分配方案,副本1、2和3。可以根据需要为其他分区指定相应的分区副本分配方案。
- 执行分区重新分配:运行以下命令来执行分区重新分配:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file
reassignment.json --execute
复制
这将触发分区重新分配操作。Kafka将自动将分区副本重新分配到指定的broker上。
- 监控分区重新分配进度:运行以下命令来检查分区重新分配的进度:
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file
reassignment.json --verify
复制
copied!
这将显示分区重新分配的进度以及每个分区的状态。
请注意,分区重新分配可能需要一些时间来完成,具体取决于分区数量和集群的负载。在分区重新分配期间,建议避免对集群进行重启或其他配置更改,以确保分区重新分配的顺利进行。
此外,Kafka还提供了一些其他工具和方法来帮助管理和平衡分区数据,如Kafka Reassign Partitions工具、Kafka Partition Reassignment工具和Kafka Cruise Control等。可以根据实际需求选择合适的工具和方法来平衡分区数据。
kafka-reassign-partitions.sh 如何使用详细解释
kafka-reassign-partitions.sh是Kafka自带的一个命令行工具,用于执行Kafka分区重新分配操作。以下是关于如何使用kafka-reassign-partitions.sh的详细解释:
- 准备分区重新分配方案:首先,需要准备一个JSON文件,指定要重新分配的主题和分区分配方案。该JSON文件应包含以下信息:
- version:指定分区重新分配方案的版本号,通常为1。
- partitions:一个数组,包含每个要重新分配的分区的信息。
- topic:要重新分配分区的主题名称。
- partition:要重新分配的分区编号。
- replicas:一个数组,指定新的分区副本分配方案。副本的顺序很重要,首先是首选副本,然后是备份副本。
以下是一个示例的JSON文件内容:
{
"version": 1,
"partitions": [
{
"topic": "my_topic",
"partition": 0,
"replicas": [1, 2, 3]
},
{
"topic": "my_topic",
"partition": 1,
"replicas": [2, 3, 1]
},
// 其他分区...
]
}
复制
- 执行分区重新分配:使用以下命令来执行分区重新分配操作:
3. bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092
--reassignment-json-file reassignment.json --execute
复制
- --bootstrap-server:指定Kafka集群的地址和端口。
- --reassignment-json-file:指定分区重新分配方案的JSON文件路径。
- --execute:指定要执行分区重新分配操作。
执行该命令将触发分区重新分配操作,Kafka将自动将分区副本重新分配到指定的broker上。
- 监控分区重新分配进度:可以使用以下命令来检查分区重新分配的进度:
5. bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file
reassignment.json --verify
复制
- --bootstrap-server:指定Kafka集群的地址和端口。
- --reassignment-json-file:指定分区重新分配方案的JSON文件路径。
- --verify:指定要验证分区重新分配进度。
运行该命令将显示分区重新分配的进度以及每个分区的状态。
请注意,分区重新分配可能需要一些时间来完成,具体取决于分区数量和集群的负载。在分区重新分配期间,建议避免对集群进行重启或其他配置更改,以确保分区重新分配的顺利进行。
此外,还可以使用其他Kafka工具和方法来帮助管理和平衡分区数据,例如Kafka Reassign Partitions工具、Kafka Partition Reassignment工具和Kafka Cruise Control等。可以
如可查看kakfa topic 里面分区的数据量
要查看Kafka主题中每个分区的数据量,可以使用kafka-run-class.sh工具结合kafka.tools.GetOffsetShell类来实现。下面是详细的步骤:
- 执行以下命令来查看主题中每个分区的当前偏移量(即数据量):
2. bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092 --topic <topic_name> --time -1
复制
- --broker-list:指定Kafka集群的地址和端口。
- --topic:指定要查看数据量的主题名称。
- --time -1:指定时间参数为-1,表示获取当前最新的偏移量。
运行该命令后,将显示每个分区的分区编号和当前偏移量。
- 如果要查看指定时间点之前的偏移量(即历史数据量),可以使用以下命令:
4. bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092 --topic <topic_name> --time <timestamp>
复制
- --time <timestamp>:指定要查看的时间点的时间戳,例如--time 1612345678901。时间戳可以是UNIX时间戳(以毫秒为单位)或日期时间字符串。
运行该命令后,将显示每个分区的分区编号和指定时间点之前的偏移量。
请注意,kafka-run-class.sh工具在Kafka bin目录下,GetOffsetShell类是Kafka自带的一个工具类。确保替换localhost:9092为您实际Kafka集群的地址和端口,以及<topic_name>为您要查看的实际主题名称。
此外,还可以使用Kafka的JMX监控功能、Kafka Manager等第三方工具来监控和查看Kafka主题中每个分区的数据量。
将auto.offset.reset设置为earliest会导致MirrorMaker从最早的可用偏移量开始消费数据。这可能会导致延迟较高,因为MirrorMaker需要重新消费之前已经处理过的消息。
如果您的目标是减少延迟,可以将auto.offset.reset设置为latest,这样MirrorMaker将从最新的可用偏移量开始消费数据。这样可以避免重新处理之前已经处理过的消息,从而减少延迟。
请注意,将auto.offset.reset设置为latest可能会导致MirrorMaker在启动后一段时间内丢失一些旧的消息。因此,在使用此配置时,请确保您不需要处理旧的消息,并在确认不会丢失关键数据的情况下使用该配置。
另外,还要注意检查MirrorMaker的其他配置和环境,以确保没有其他原因导致延迟过高。
想要kafka 从上一次中断出开始消费 offset 参数该怎么设置
想要从kafka 的最新位置消费 该怎么设置参数
要从 Kafka 的最新位置开始消费消息,可以通过设置消费者的 auto.offset.reset 参数为 latest 来实现。这个参数告诉 Kafka 消费者,在没有可用的消费者组的偏移量时,从最新的消息开始消费。
以下是设置 Kafka 消费者从最新位置开始消费的示例代码:
Properties props = new Properties();
props.put("bootstrap.servers", "your-bootstrap-servers");
props.put("group.id", "your-consumer-group");
props.put("auto.offset.reset", "latest"); // 设置为 latest
KafkaConsumer<String,
String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-topic"));
while (true) {
ConsumerRecords<String, String>
records = consumer.poll(Duration.ofMillis(100));
// 处理消费的消息
for (ConsumerRecord<String, String> record : records) {
// 处理消息的逻辑
}
}
复制
在上述示例中,将 auto.offset.reset 参数设置为 latest。这将确保消费者从最新的消息开始消费,即使没有可用的消费者组的偏移量。
请注意,如果消费者组已经有了偏移量并且要从最新位置开始消费,需要先将消费者组的偏移量删除,或者将消费者组的 group.id 更改为一个新的值,以避免使用旧的偏移量。
想要从最早的位置消费kafka 数据 该怎么设置
要从 Kafka 的最早位置开始消费消息,可以通过设置消费者的 auto.offset.reset 参数为 earliest 来实现。这个参数告诉 Kafka 消费者,在没有可用的消费者组的偏移量时,从最早的消息开始消费。
以下是设置 Kafka 消费者从最早位置开始消费的示例代码:
Propertiesprops=newProperties();
props.put("bootstrap.servers","your-bootstrap-servers");
props.put("group.id","your-consumer-group");
props.put("auto.offset.reset","earliest");// 设置为 earliest
KafkaConsumer<String, String> consumer =newKafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-topic"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消费的消息
for(ConsumerRecord<String, String> record : records) {
// 处理消息的逻辑
}}复制
在上述示例中,将 auto.offset.reset 参数设置为 earliest。这将确保消费者从最早的消息开始消费,即使没有可用的消费者组的偏移量。
请注意,如果消费者组已经有了偏移量并且要从最早位置开始消费,需要先将消费者组的偏移量删除,或者将消费者组的 group.id 更改为一个新的值,以避免使用旧的偏移量




