场景说明
当kafka集群的容量快要达到瓶颈,这个容量包括:磁盘空间、带宽、CPU等,我们需要给集群做节点扩容,本文描述纯手工整个扩容的步骤。
部署broker
申请新服务器,假如IP是:10.10.10.10
从集群中任意一台broker服务器上拷贝kafka的程序包到新服务器上,里面包含了配置文件
修改配置文件,只列出需要修改的配置
broker.id=4 #需要保证不能和集群中已经有broker冲突
listeners=PLAINTEXT://10.10.10.10:9092 #本机的内网IP
启动服务
/chj/app/kafka/bin/kafka-server-start.sh -daemon /chj/app/kafka/config/server.properties
检查broker是否正常加入集群,通过
ls kafka/brokers/ids/
命令从zk里面查看是否有新的broker的id,如果有说明新节点正常加入集群
[root@10-10-10-1 bin]# ./zookeeper-shell.sh 10.10.10.1:2181
ls /kafka/brokers
[ids, topics, seqid]
ls /kafka/brokers/ids
[1, 2, 3,4]
迁移分区
上面只是新部署好了broker,要想把流量和压力分摊到新broker上,需要把topic的分区和副本迁移一部分到新broker上,下面是具体迁移步骤
为了迁移速度快,我们可以把数据量特别大的topic的消息保留时间临时调短,这样需要迁移的数据就很少了,从而加快迁移速度,但是调短的前提是需要确保消息都已经消费完了,这个可以从监控看消费延时,通过下面方法可以设置topic的数据保留时间,我们可以先临时调整成1个小时:
./kafka-topics.sh --zookeeper 10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181 --alter --topic mytopic --config retention.ms=3600000
观察kafka数据是否已经完成了数据的purge
a.通过server.log查看purge过程
b.直观的观察data目录是够存在1个小时之前的日志data
3.准备需要迁移的topci的parttion的topic的json文件
vim my.jason
{"topics": [{
"topic": "mytopic1"},
{"topic": "mytopic2"}
],
"version":1
}
通过下面命令reassign partitions
./kafka-reassign-partitions.sh --topics-to-move-json-file my.jason --zookeeper 10.10.10.1:2181/kafka --broker-list "1,2,3,4" --generate
使用上一步生成的建议partition json内容进行迁移
“Proposed partition reassignment configuration” 后面的内容保存到reassign.json文件中
./kafka-reassign-partitions.sh --zookeeper 10.10.10.1:2181/kafka --reassignment-json-file reassign.json --execute
检验partition的迁移状态
./kafka-reassign-partitions.sh --zookeeper 10.10.10.1:2181/kafka --reassignment-json-file reassign.json --verify
迁移完成最后把topic的数据保留数据还原成2天
./kafka-topics.sh --zookeeper 10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181 --alter --topic mytopic --config retention.ms=172800000
总结
本文介绍了手动扩容kafka的全过程,整个过程还是比较繁琐的,如果集群比较多或者经常需要扩容手动肯定是不行的,需要平台化,推荐滴滴开源的一款Kafka管理平台,个人认为目前是市面上比较不错的,之前也写过一篇文章,大家可以参考下:kafka管理平台




