暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

Kafka 3.3 Kraft集群部署Mirrormake2复制集群

Hostname

IP

Cpu©

Memory(G)

Disk(G)

角色

SrKafka001

10.132.35.3

16

64

1024

Broker、Controll

SrKafka002

10.132.34.255

16

64

1024

Broker、Controll

SrKafka003

10.132.35.2

16

64

1024

Broker、Controll

SrKafka004

10.132.35.1

16

64

1024

Broker

SrKafka005

10.132.35.0

16

64

1024

Broker

软件配置

 

kafka_2.13-3.3.2.tgz

部署实施

创建目录(所有kafka节点机器):

#mkdir /usr/local/kakfa  kafka安装目录

#mkddir /usr/local/java  java 安装目录(java version "1.8.0_351")

#mkdir /data/kafkalogs   kafka日志目录

 

 

解压软件(所有kafka 节点机器都需要安装)

我准备得kafka_2.13-3.3.2.tgz 软件包统一放在/opt/soft 目录下

# cd /opt/soft/

#tar -xvf kafka_2.13-3.3.2.tgz

# cd kafka_2.13-3.3.2

# mv * /usr/local/kafka/

 

修改配置文件

因为我们是kraft 得方式部署已经不依赖zookeeper 所以修改得servier.properties  在 kraft 目录下

#cd /usr/local/kafka/config/

#ls

这里不用理会kraft 文件上一层得各类文件

#cd /usr/local/kafka/config/kraft/

修改

#vim server.properties

 

需要修改得参数:

process.roles=broker,controller

controller.quorum.voters=1@10.132.35.3:9093,2@10.132.34.255:9093,3@10.132.35.2:9093

listeners=PLAINTEXT://10.132.34.255:9092,CONTROLLER://10.132.34.255:9093

advertised.listeners=PLAINTEXT://10.132.34.255:9092

log.dirs=/data/kafkalogs

 

其中Process.Roles

每个Kafka服务器现在都有一个新的配置项,叫做Process.Roles, 这个参数可以有以下值:

如果Process.Roles = Broker, 服务器在KRaft模式中充当 Broker。

如果Process.Roles = Controller, 服务器在KRaft模式下充当 Controller。

如果Process.Roles = Broker,Controller,服务器在KRaft模式中同时充当 Broker 和Controller。

如果process.roles 没有设置。那么集群就假定是运行在ZooKeeper模式下。

在Kafka的Kraft模式中,Broker和Controller的角色和职责与传统的Kafka集群模式有一些相似之处,但也存在一些特定的差异。

Broker:

角色:Broker仍然是Kafka集群中的一个独立服务器,负责处理来自生产者的消息并将其存储在集群中,同时也负责将消息发送给消费者。

功能:在Kraft模式中,Broker不仅维护消息队列,还负责参与集群的共识过程,确保消息的可靠性和一致性。它与其他Broker协同工作,通过共识算法(如Raft)来选举领导者并达成共识,从而确保集群的稳定性和容错性。

Controller:

角色:在Kraft模式中,Controller的角色仍然扮演着集群管理和协调的重要角色。然而,与传统的Kafka集群模式不同,Kraft模式中的Controller不再完全依赖于Zookeeper来进行集群元数据的管理。

功能:Controller负责管理和维护集群的元数据,包括分区信息、Broker状态等。它与其他Broker通信,收集状态信息,并根据需要执行相应的管理操作,如分区重新分配、Broker状态更新等。由于Kraft模式采用了基于Raft的共识算法,Controller的选举和元数据的一致性也得到了保证。

区别:

依赖关系:在传统的Kafka集群模式中,Controller通常依赖于Zookeeper来进行集群的管理和协调。而在Kraft模式中,由于采用了基于Raft的共识算法,Controller不再完全依赖于Zookeeper,而是通过与其他Broker的协同工作来达成共识和进行集群管理。

共识机制:在Kraft模式中,Broker和Controller都参与了基于Raft的共识过程。这意味着它们不仅交换状态信息,还通过投票和日志复制等机制来确保集群的一致性和容错性。这种共识机制使得Kraft模式在处理集群故障和恢复时更加健壮和可靠。

总结来说,在Kafka的Kraft模式中,Broker和Controller仍然扮演着各自的角色,但它们的职责和依赖关系与传统的Kafka集群模式有所不同。Broker参与共识过程,而Controller则通过与其他Broker的协同工作来管理和维护集群的元数据,确保集群的稳定性和一致性。

 

 

其中node.id 

#节点ID每个节点需要保持唯一

其中controller.quorum.voters

系统中的所有节点都必须设置 `controller.quorum.voters` 配置。这个配置标识有哪些节点是 Quorum 的投票者节点。所有想成为控制器的节点都需要包含在这个配置里面。

 我们有五台机器 其中SrKafka001、SrKafka002、SrKafka003 即承担broker角色也承担controller 这个角色设置体现在参数process.roles 里面指定 SrKafka004、SrKafka005 做broker 角色 那么process.roles=broker即可。

controller.quorum.voters配置格式为  node.id@ip:port多个用,隔开,每台机器都一样。

其中listeners

listeners参数用于指定Kafka Broker监听的网络接口和端口,它定义了Kafka Broker绑定的IP地址或主机名以及所使用的协议和端口号。这个参数是Kafka Broker自己使用的,用于监听来自客户端(生产者和消费者)的连接请求。

advertised.listeners参数是一个可选参数,用于指定客户端连接到Kafka Broker的网络接口和端口。当Kafka Broker处于一个网络中,但客户端在另一个网络中时,客户端可能无法直接连接到Kafka Broker所在的网络接口和端口。在这种情况下,Kafka Broker可以使用advertised.listeners参数来提供一个公共的网络接口和端口,以便客户端能够连接。

advertised.listeners参数的值可以是与listeners参数相同的值,也可以是一个经过转换的值。转换的方式通常是将内部的IP地址或主机名替换为外部可访问的IP地址或主机名。这样,客户端可以使用advertised.listeners参数指定的网络接口和端口来连接到Kafka Broker,即使它们无法直接访问Kafka Broker所在的网络接口和端口。

总结:

  • listeners参数是Kafka Broker自己使用的,用于监听客户端的连接请求。
  • advertised.listeners参数是可选的,用于指定客户端连接到Kafka Broker的网络接口和端口。它提供了一个公共的网络接口和端口,以便客户端能够连接到Kafka Broker,即使它们无法直接访问Kafka Broker所在的网络接口和端口。

如果节点角色即是broker 又是 controller  那么 listeners 设置为listeners=PLAINTEXT://10.132.34.255:9092,CONTROLLER://10.132.34.255:9093

如果仅为broker 或者 controller   那么只要设置对应得其中一个即可。

 

配置分发:

将server.properties 文件分发到其余节点 并记得检查node.id 、log.dirs、listeners、Process.Roles、advertised.listeners 参数。

 

 

 

启动Kafka Kraft 集群

运行KRaft集群,主要分为三步

用kafka-storage.sh 生成一个唯一的集群ID(只需要在一个节点生产即可)

#cd /usr/local/kakfa/bin

 

用kafka-storage.sh 格式化存储数据的目录(每个节点都需要执行)

#cd /usr/local/kakfa/bin

#./kafka-storage.sh format -t pXiK7pZTQaa5l8juVFB9kw -c /usr/local/kafka/config/kraft/server.properties

红色代码为 生成得UUID

用bin/kafka-server-start.sh 启动Kafka Server



 

运维集群

查看集群:

./kafka-broker-api-versions.sh --bootstrap-server 10.132.35.3:9092

创建集群

#./bin/kafka-topics.sh --create --topic kafkaraftTest --partitions 1 --replication-factor 1 --bootstrap-server 10.132.35.3:9092

 

查看集群topic

查看单个topic 得信息

./kafka-topics.sh --bootstrap-server 10.132.35.3:9092 --describe --topic geely-json-topic

 

 

到此 我们已经部署好了 kafka kraft 集群 但是是最基础得集群并没有做安全和优化配置。

使用Mirror Maker2 复制生成集群

Kafka 集群部署完成后,接下来我们就可以想办法复制生产集群数据了。对于kakfa 集群得数据复制可以通过

1、自己写程序 消费kafka 再吐入kafka 的方式

2、通过kafka 自带的工具Mirror Maker2 复制集群的方式

这里我们通过第二张方式,因为大部分运维人员没有代码能力。

 

源kafka 集群:

Broker

10.132.34.206:6667,10.132.34.207:6667,10.132.34.208:6667,10.132.34.209:6667,10.132.34.210:6667

 

目标集群:就是我们搭建的集群

Broker

10.132.34.255:9092,10.132.35.1:9092,10.132.35.2:9092,10.132.35.3:9092,10.132.35.0:9092

源集群已集成kerberos

 

第一步: 新建的集群安装(所有节点安装)

#yum install krb5-workstation krb5-libs krb5-auth-dialog -y

 

问你的管理员拿到 krb5.conf 文件 、keytab、kafka_jass.conf 文件、

 

第二步:同步配置文件

把krb5.conf 文件 放到所有节点/etc/ 目录下  要先做第一步 如果不做第一步直接放 再yum 安装的时候会自动产生krb5.conf 文件 覆盖掉。

把keytab文件 放到所有节点 kafka_jass.conf 文件指定的目录下

我这里是/etc/security/keytabs/

把kafka_jass.conf 文件放到所有节点/usr/local/kakfa/config 文件目录下 (也可以放到其他地方 这里放这里是利于管理)

 

第三步:修改hosts文件

把安装kdc的主机加入到 /etc/hosts 里面

第四步:验证kerberos

kinit -kt /etc/security/keytabs/kafka.service.keytab  kafka/vel-kafka34206.gdmp.com@GDMP.COM

 

第五步:修改同步--consumer.config 和--producer.config 文件(同步到其他节点)

修改--consumer.config 和--producer.config 文件

Consumer.properties

这个文件添加两个参数

security.protocol=SASL_PLAINTEXT

sasl.mechanism=GSSAPI

bootstrap.servers=10.132.34.206:6667,10.132.34.207:6667,10.132.34.208:6667,10.132.34.209:6667,10.132.34.210:6667

Producer.properites

这个文件 指定bootstrap.servers=10.132.35.3:9092,10.132.34.255:9092,10.132.35.0:9092,10.132.35.1:9092,10.132.35.2:9092

新建的集群没有安全限制 不用配置其他安全       

 

 

第六步:启动复制

#/usr/local/kafka/bin

#nohup ./kafka-mirror-maker.sh --consumer.config /usr/local/kafka/config/consumer.properties   --num.streams 20 --producer.config /usr/local/kafka/config/producer.properties --whitelist="geely-json-topic"  &

 

至此使用mirror make2 复制已经配置完成

 

可通过 运维集群章节命令查看集群。

 

通过mirror makeer 复制的kafka 数据 分区不均衡

 

使用MirrorMaker复制Kafka数据时,可能会出现分区不均衡的情况。这可能是由于以下几个原因导致的:

1.      消费者组配置不正确:在MirrorMaker的配置文件中,如果消费者组的数量大于目标集群的分区数量,则会导致分区不均衡。确保消费者组的数量与目标集群的分区数量相匹配。

2.      目标集群的分区数量不足:如果目标集群的分区数量少于源集群的分区数量,则无法实现完全的均衡复制。在创建目标集群时,确保分区数量与源集群相匹配。

3.      消费者组的偏移量不同步:如果源集群和目标集群的消费者组偏移量不同步,则可能导致分区不均衡。你可以使用工具类kafka.tools.GetOffsetShell来检查消费者组的偏移量,并确保源集群和目标集群的偏移量保持一致。

4.      网络问题:如果网络延迟或带宽限制较大,则可能导致MirrorMaker无法及时复制数据,从而导致分区不均衡。确保网络连接稳定,并检查带宽是否足够支持复制操作。

为了解决分区不均衡的问题,你可以尝试以下方法:

1.      调整消费者组的数量:根据目标集群的分区数量,适当调整消费者组的数量,以实现分区的均衡复制。

2.      手动重新分配分区:如果分区不均衡的程度较大,你可以手动重新分配分区,将不均衡的分区重新分配给其他消费者组,以实现分区的均衡复制。

3.      检查消费者组的偏移量:使用kafka.tools.GetOffsetShell工具类检查源集群和目标集群的消费者组偏移量是否一致,如果不一致,则可以通过手动调整偏移量来解决。

4.      检查网络连接和带宽:确保网络连接稳定,并检查带宽是否足够支持复制操作。

请注意,MirrorMaker是一个异步复制工具,不能保证实时和完全一致的数据复制。因此,在使用MirrorMaker进行数据复制时,分区不均衡是常见的情况,特别是在高负载或高延迟的情况下。如果需要实时和完全一致的数据复制,可以考虑使用其他工具或方法,如Kafka Streams或Kafka Connect。

 

如何手动平衡kafka 分区数据

要手动平衡Kafka分区数据,可以按照以下步骤进行操作:

  1. 查看当前分区分布:使用Kafka自带的命令行工具kafka-topics.sh来查看当前主题的分区分布情况。例如,运行以下命令来查看名为"my_topic"的主题的分区分布:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my_topic

复制

copied!

  1. 计算分区的平均负载:根据当前的分区分布情况,计算每个消费者组或消费者实例应处理的平均分区数量。例如,如果有3个消费者组,而主题有9个分区,则每个消费者组应处理3个分区。
  2. 重新分配分区:根据计算出的平均负载,手动重新分配分区。可以使用kafka-reassign-partitions.sh工具来执行分区重新分配。首先,创建一个JSON文件,指定要重新分配的主题和分区分配方案。例如,创建一个名为"reassignment.json"的文件,其内容如下:

{

  "version": 1,

  "partitions": [

    {

      "topic": "my_topic",

      "partition": 0,

      "replicas": [1, 2, 3]   // 新的分区副本分配方案

    },

    {

      "topic": "my_topic",

      "partition": 1,

      "replicas": [2, 3, 1]

    },

    // 其他分区...

  ]

}

复制

copied!

在上面的示例中,为"my_topic"的第一个分区指定了新的分区副本分配方案,副本1、2和3。可以根据需要为其他分区指定相应的分区副本分配方案。

  1. 执行分区重新分配:运行以下命令来执行分区重新分配:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --execute

复制

这将触发分区重新分配操作。Kafka将自动将分区副本重新分配到指定的broker上。

  1. 监控分区重新分配进度:运行以下命令来检查分区重新分配的进度:

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify

复制

copied!

这将显示分区重新分配的进度以及每个分区的状态。

请注意,分区重新分配可能需要一些时间来完成,具体取决于分区数量和集群的负载。在分区重新分配期间,建议避免对集群进行重启或其他配置更改,以确保分区重新分配的顺利进行。

此外,Kafka还提供了一些其他工具和方法来帮助管理和平衡分区数据,如Kafka Reassign Partitions工具、Kafka Partition Reassignment工具和Kafka Cruise Control等。可以根据实际需求选择合适的工具和方法来平衡分区数据。

kafka-reassign-partitions.sh 如何使用详细解释

 

kafka-reassign-partitions.sh是Kafka自带的一个命令行工具,用于执行Kafka分区重新分配操作。以下是关于如何使用kafka-reassign-partitions.sh的详细解释:

  1. 准备分区重新分配方案:首先,需要准备一个JSON文件,指定要重新分配的主题和分区分配方案。该JSON文件应包含以下信息:
    • version:指定分区重新分配方案的版本号,通常为1。
    • partitions:一个数组,包含每个要重新分配的分区的信息。
      • topic:要重新分配分区的主题名称。
      • partition:要重新分配的分区编号。
      • replicas:一个数组,指定新的分区副本分配方案。副本的顺序很重要,首先是首选副本,然后是备份副本。

以下是一个示例的JSON文件内容:

{

  "version": 1,

  "partitions": [

    {

      "topic": "my_topic",

      "partition": 0,

      "replicas": [1, 2, 3]

    },

    {

      "topic": "my_topic",

      "partition": 1,

      "replicas": [2, 3, 1]

    },

    // 其他分区...

  ]

}

复制

  1. 执行分区重新分配:使用以下命令来执行分区重新分配操作:

3.  bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --execute

复制

    • --bootstrap-server:指定Kafka集群的地址和端口。
    • --reassignment-json-file:指定分区重新分配方案的JSON文件路径。
    • --execute:指定要执行分区重新分配操作。

执行该命令将触发分区重新分配操作,Kafka将自动将分区副本重新分配到指定的broker上。

  1. 监控分区重新分配进度:可以使用以下命令来检查分区重新分配的进度:

5.  bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify

复制

    • --bootstrap-server:指定Kafka集群的地址和端口。
    • --reassignment-json-file:指定分区重新分配方案的JSON文件路径。
    • --verify:指定要验证分区重新分配进度。

运行该命令将显示分区重新分配的进度以及每个分区的状态。

请注意,分区重新分配可能需要一些时间来完成,具体取决于分区数量和集群的负载。在分区重新分配期间,建议避免对集群进行重启或其他配置更改,以确保分区重新分配的顺利进行。

此外,还可以使用其他Kafka工具和方法来帮助管理和平衡分区数据,例如Kafka Reassign Partitions工具、Kafka Partition Reassignment工具和Kafka Cruise Control等。可以

如可查看kakfa topic 里面分区的数据量

https://oai.geely.com/img/chatGPT.3b4d3281.png

要查看Kafka主题中每个分区的数据量,可以使用kafka-run-class.sh工具结合kafka.tools.GetOffsetShell类来实现。下面是详细的步骤:

  1. 执行以下命令来查看主题中每个分区的当前偏移量(即数据量):

2.  bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic_name> --time -1

复制

    • --broker-list:指定Kafka集群的地址和端口。
    • --topic:指定要查看数据量的主题名称。
    • --time -1:指定时间参数为-1,表示获取当前最新的偏移量。

运行该命令后,将显示每个分区的分区编号和当前偏移量。

  1. 如果要查看指定时间点之前的偏移量(即历史数据量),可以使用以下命令:

4.  bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic_name> --time <timestamp>

复制

    • --time <timestamp>:指定要查看的时间点的时间戳,例如--time 1612345678901。时间戳可以是UNIX时间戳(以毫秒为单位)或日期时间字符串。

运行该命令后,将显示每个分区的分区编号和指定时间点之前的偏移量。

请注意,kafka-run-class.sh工具在Kafka bin目录下,GetOffsetShell类是Kafka自带的一个工具类。确保替换localhost:9092为您实际Kafka集群的地址和端口,以及<topic_name>为您要查看的实际主题名称。

此外,还可以使用Kafka的JMX监控功能、Kafka Manager等第三方工具来监控和查看Kafka主题中每个分区的数据量。

 

 

将auto.offset.reset设置为earliest会导致MirrorMaker从最早的可用偏移量开始消费数据。这可能会导致延迟较高,因为MirrorMaker需要重新消费之前已经处理过的消息。

如果您的目标是减少延迟,可以将auto.offset.reset设置为latest,这样MirrorMaker将从最新的可用偏移量开始消费数据。这样可以避免重新处理之前已经处理过的消息,从而减少延迟。

请注意,将auto.offset.reset设置为latest可能会导致MirrorMaker在启动后一段时间内丢失一些旧的消息。因此,在使用此配置时,请确保您不需要处理旧的消息,并在确认不会丢失关键数据的情况下使用该配置。

另外,还要注意检查MirrorMaker的其他配置和环境,以确保没有其他原因导致延迟过高。

想要kafka 从上一次中断出开始消费 offset 参数该怎么设置

 

想要从kafka 的最新位置消费 该怎么设置参数

要从 Kafka 的最新位置开始消费消息,可以通过设置消费者的 auto.offset.reset 参数为 latest 来实现。这个参数告诉 Kafka 消费者,在没有可用的消费者组的偏移量时,从最新的消息开始消费。

以下是设置 Kafka 消费者从最新位置开始消费的示例代码:

Properties props = new Properties();

props.put("bootstrap.servers", "your-bootstrap-servers");

props.put("group.id", "your-consumer-group");

props.put("auto.offset.reset", "latest"); // 设置为 latest

 

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("your-topic"));

 

while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    // 处理消费的消息

    for (ConsumerRecord<String, String> record : records) {

        // 处理消息的逻辑

    }

}

复制

在上述示例中,将 auto.offset.reset 参数设置为 latest。这将确保消费者从最新的消息开始消费,即使没有可用的消费者组的偏移量。

请注意,如果消费者组已经有了偏移量并且要从最新位置开始消费,需要先将消费者组的偏移量删除,或者将消费者组的 group.id 更改为一个新的值,以避免使用旧的偏移量。

 

想要从最早的位置消费kafka 数据 该怎么设置

要从 Kafka 的最早位置开始消费消息,可以通过设置消费者的 auto.offset.reset 参数为 earliest 来实现。这个参数告诉 Kafka 消费者,在没有可用的消费者组的偏移量时,从最早的消息开始消费。

以下是设置 Kafka 消费者从最早位置开始消费的示例代码:

Properties props = new Properties();
props.put("bootstrap.servers", "your-bootstrap-servers");
props.put("group.id", "your-consumer-group");
props.put("auto.offset.reset", "earliest"); // 设置为 earliest
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your-topic"));
 
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // 处理消费的消息
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息的逻辑
    }
}
复制

在上述示例中,将 auto.offset.reset 参数设置为 earliest。这将确保消费者从最早的消息开始消费,即使没有可用的消费者组的偏移量。

请注意,如果消费者组已经有了偏移量并且要从最早位置开始消费,需要先将消费者组的偏移量删除,或者将消费者组的 group.id 更改为一个新的值,以避免使用旧的偏移量

 

 

 

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论