
为什么要自建 Kafka


Kafka 介绍
Kafka官方的定义:是一种高吞吐量的分布式发布/订阅消息系统。这样说起来可能不太好理解,我们简单举个例子:现在是个大数据时代,各种商业、社交、搜索、浏览都会产生大量的数据,那么如何快速收集这些数据,如何实时地分析这些数据,是一个必须要解决的问题。
Produce)各种数据,消费者(
Consume)消费(分析、处理)这些数据。面对这些需求,如何高效、稳定地完成数据的生产和消费呢?这就需要在生产者与消费者之间,建立一个通信的桥梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同系统之间如何传递消息。
Kafka 应用场景
场景1:订阅与发布

场景2:日志聚合

场景3:基于事件驱动的 CQRS

Kafka 架构
· Broker 节点:负责处理 I/O 操作及数据持久化。
· ZooKeeper 节点:用来管理 Kafka 控制器的状态、存储集群的元数据信息。
· Producers:客户端程序,用来向 Topic 写入消息。
· Consumers:客户端程序,用来从 Topic 读取消息。

消息组成
· Key: 记录可以与可选的非唯一键相关联,该键充当一种分类器 - 根据其键对相关记录进行分组。键是完全自由形式的、任何可以表示为字节数组的东西都可以用作记录键。
· Value: 值实际上是记录的信息负载。在业务意义上,该值是记录中最有趣的部分 — 最终描述事件的是记录的值。值是可选的,但很少看到具有 null 值的记录。如果没有值,记录在很大程度上是毫无意义的。所有其他属性在传达值时起着辅助作用。
· Headers: 一组自由格式的键值对,可以选择性地对记录进行批注。Kafka 中的 Header 类似于它们在 HTTP 中的同名标题,它可以认为是 Value 的额外信息补充。
· Partition number: 记录所在的分区的从零开始的索引。记录必须始终只绑定到一个分区;但是,在发布记录时不需要显式指定分区。
· Offset: 一个 64 位有符号整数,用于在其包含的分区内查找记录。记录按顺序存储,偏移量表示记录的逻辑序列号。
· Timestamp: 记录的毫秒级精确时间戳。时间戳可以由创建器显式设置为任意值,也可以由代理在将记录追加到日志时自动分配。
一般情况下,开发人员比较关注的是 Value 这一部分。其实本文的作者也很好奇,我们看一个实际的例子,去一探究竟。代码如下:
# 生产者代码:
(venv392) root@node01:~/kafka# cat kafka-producer-python.py
import json
from kafka import KafkaProducer
conf = {
'bootstrap_servers': ["node01:9093","node02:9093","node03:9093"],
'topic_name': 'python_ssl_topic_name',
'ssl_cafile': '/root/mycerts/certificates/CARoot.pem',
'ssl_certfile': '/root/mycerts/certificates/certificate.pem',
'ssl_keyfile': '/root/mycerts/certificates/key.pem'
}
print('start producer')
producer = KafkaProducer(
bootstrap_servers=conf['bootstrap_servers'],
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile=conf['ssl_cafile'],
ssl_certfile=conf['ssl_certfile'],
ssl_keyfile=conf['ssl_keyfile'])
json_value = {"name": "tyun tech", "location": "zhangjiang"}
key = bytes("tyun", encoding="utf-8")
headers = [("Host", bytes("www.tyun.cn", encoding="utf-8"))]
data = bytes(json.dumps(json_value), encoding="utf-8")
producer.send(conf['topic_name'], value=data, key=key, headers=headers)
producer.close()
print('end producer')
# 消费者代码:
(venv392) root@node01:~/kafka# cat kafka-consumer-python.py
from kafka import KafkaConsumer
conf = {
'bootstrap_servers': ["node01:9093","node02:9093","node03:9093"],
'topic_name': 'python_ssl_topic_name',
'consumer_id': 'python_ssl_consumer',
'ssl_cafile': '/root/mycerts/certificates/CARoot.pem',
'ssl_certfile': '/root/mycerts/certificates/certificate.pem',
'ssl_keyfile': '/root/mycerts/certificates/key.pem'
}
print('start consumer')
consumer = KafkaConsumer(conf['topic_name'],
bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'],
security_protocol='SSL',
ssl_check_hostname=False,
ssl_cafile=conf['ssl_cafile'],
ssl_certfile=conf['ssl_certfile'],
ssl_keyfile=conf['ssl_keyfile'])
for message in consumer:
print("{}:{}:{}: key={} value={} headers={} timestamp={}".format(message.topic,
message.partition,
message.offset,
message.key,
message.value,
message.headers,
message.timestamp))
print('end consumer')
(venv392) root@node01:~/kafka# python kafka-consumer-python.py
start consumer
python_ssl_topic_name:1:9: key=b'tyun' value=b'{"name": "tyun tech", "location": "zhangjiang"}' headers=[('Host', b'www.tyun.cn')] timestamp=1654139871642
python_ssl_topic_name:1:10: key=b'tyun' value=b'{"name": "tyun tech", "location": "zhangjiang"}' headers=[('Host', b'www.tyun.cn')] timestamp=1654139909527
python_ssl_topic_name:1:11: key=b'tyun' value=b'{"name": "tyun tech", "location": "zhangjiang"}' headers=[('Host', b'www.tyun.cn')] timestamp=1654139911635
python_ssl_topic_name:1:12: key=b'tyun' value=b'{"name": "tyun tech", "location": "zhangjiang"}' headers=[('Host', b'www.tyun.cn')] timestamp=1654139912718
# 我们需要生产者生产数据
(venv392) root@node01:~/kafka# python kafka-producer-python.py
start producer
end producer
分区
Kafka 如何持久化数据
Kafka 安装部署
1、单机模式
a.单进程模式
b.Docker Compose 模式
2、集群模式
a. 基于虚拟机的部署方式
b. 基于 Kubernetes 的部署方式
单机模式安装
单进程模式
wget -c https://archive.apache.org/dist/kafka/2.6.0/kafka_2.12-2.6.0.tgz
export KAFKA_HOME=opt/kafka_2.12-2.6.0
# 前台启动 Zookeeper
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
# 后台启动 Zookeeper
$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
tail -f $KAFKA_HOME/logs/zookeeper.out
# 关闭 Zookeeper
$KAFKA_HOME/bin/zookeeper-server-stop.sh
# 前台启动 Kafka
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
# 后台启动 Kafka
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
tail -f $KAFKA_HOME/logs/kafkaServer.out
# 关闭 Kafka
$KAFKA_HOME/bin/kafka-server-stop.sh
很多人都在使用前台启动方式,然后使用 nohup 放后台执行。为啥不直接使用 **-daemon**
选项呢?
Docker Compose 模式安装
version: "3.2"
services:
zookeeper:
image: bitnami/zookeeper:3
ports:
- 2181:2181
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
kafka:
image: bitnami/kafka:2
ports:
- 9092:9092
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENERS: >-
INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: >-
INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop:latest
ports:
- 9000:9000
environment:
KAFKA_BROKERCONNECT: kafka:29092
depends_on:
- kafka
version: "3.2"
services:
zookeeper:
image: bitnami/zookeeper:3
ports:
- 2181:2181
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
kafka-0:
image: bitnami/kafka:2
ports:
- 9092:9092
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENERS: >-
INTERNAL://:29092,EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: >-
INTERNAL://kafka-0:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
depends_on:
- zookeeper
kafka-1:
image: bitnami/kafka:2
ports:
- 9093:9093
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENERS: >-
INTERNAL://:29092,EXTERNAL://:9093
KAFKA_ADVERTISED_LISTENERS: >-
INTERNAL://kafka-1:29092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
depends_on:
- zookeeper
kafka-2:
image: bitnami/kafka:2
ports:
- 9094:9094
environment:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENERS: >-
INTERNAL://:29092,EXTERNAL://:9094
KAFKA_ADVERTISED_LISTENERS: >-
INTERNAL://kafka-2:29092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >-
INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop:latest
ports:
- 9000:9000
environment:
KAFKA_BROKERCONNECT: >-
kafka-0:29092,kafka-1:29092,kafka-2:29092
depends_on:
- kafka-0
- kafka-1
- kafka-2
集群模式安装
| 主机名 | IP 地址 | 安装软件 | 软件版本 |
|---|---|---|---|
| node01 | 192.168.110.99 | Kafka、ZooKeeper | kafka_2.12-2.6.0、zookeeper-3.6.1 |
| node02 | 192.168.110.211 | Kafka、ZooKeeper | kafka_2.12-2.6.0、zookeeper-3.6.1 |
| node03 | 192.168.110.197 | Kafka、ZooKeeper | kafka_2.12-2.6.0、zookeeper-3.6.1 |
| node04 | 192.168.110.158 | Kafka Eagle | kafka-eagle-bin-2.1.0 |
(venv392) root@node01:~/kafka# ansible -i hosts 'kafka' -m shell -a "hostname"
node01 | CHANGED | rc=0 >>
node01.tyun.cn
node03 | CHANGED | rc=0 >>
node03.tyun.cn
node02 | CHANGED | rc=0 >>
node02.tyun.cn
# 下载所需要的二进制文件
wget -c https://archive.apache.org/dist/kafka/2.6.0/kafka_2.12-2.6.0.tgz
wget -c https://archive.apache.org/dist/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
wget -c https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz
Kafka
和ZooKeeper
的版本,默认Kafka 2.12
版本自带的ZooKeeper
依赖jar
包版本为3.5.7
,因此ZooKeeper
的版本至少在3.5.7
及以上。
配置 ZooKeeper 分布式集群
[vagrant@node01 ~]$ cd /opt
[vagrant@node01 opt]$ sudo tar -xf apache-zookeeper-3.6.1-bin.tar.gz
[vagrant@node01 opt]$ sudo ln -sv apache-zookeeper-3.6.1-bin zookeeper
zk配置文件:
[vagrant@node01 conf]$ cp zoo_sample.cfg zoo.cfg
[vagrant@node01 conf]$ grep -E -v "^#|^$" zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/apps/zookeeper-3.6.1/data
clientPort=2181
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
创建数据目录:
[vagrant@node01 conf]$ sudo mkdir -pv /data/apps/zookeeper
除了修改 zoo.cfg
配置文件,集群模式下还要配置一个文件 myid
,这个文件 在 dataDir
目录下,这个文件里面就有一个数据就是 id
的值,Zookeeper
启动时会读取这个文件,拿到里面的数据与 zoo.cfg
里面的配置信息比较从而判断到底是那个 server
。
[vagrant@node01 conf]# echo "1" > /data/apps/zookeeper/myid
[vagrant@node02 conf]# echo "2" > /data/apps/zookeeper/myid
[vagrant@node03 conf]# echo "3" > /data/apps/zookeeper/myid
[root@node01 conf]# /data/apps/zookeeper-3.6.1/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/apps/zookeeper-3.6.1/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node02 conf]# /data/apps/zookeeper-3.6.1/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/apps/zookeeper-3.6.1/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node03 conf]# /data/apps/zookeeper-3.6.1/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/apps/zookeeper-3.6.1/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@node01 conf]# /data/apps/zookeeper-3.6.1/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/apps/zookeeper-3.6.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[root@node02 conf]# /data/apps/zookeeper-3.6.1/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/apps/zookeeper-3.6.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
[root@node03 conf]# /data/apps/zookeeper-3.6.1/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/apps/zookeeper-3.6.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
zk集群上看看:
[root@node01 conf]# /data/apps/zookeeper-3.6.1/bin/zkCli.sh -server 127.0.0.1:2181
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /
[zookeeper]
[zk: 127.0.0.1:2181(CONNECTED) 2] ls -R /
/
/zookeeper
/zookeeper/config
/zookeeper/quota
配置 Kafka 集群
(venv392) root@node01:~/kafka# cat hosts
[kafka]
node01
node02
node03
[eagle]
node04
(venv392) root@node01:~/kafka# ansible \
-i hosts 'kafka' \
-m copy -a "src=kafka_2.12-2.6.0.tgz dest=/data/apps"
# 解压
(venv392) root@node01:~/kafka# ansible \
-i hosts 'kafka' \
-m shell -a "tar -xf /data/apps/kafka_2.12-2.6.0.tgz -C /data/apps"
[root@node01 share]# cd /data/apps/
[root@node01 apps]# ln -sv kafka_2.12-2.6.0 kafka
‘kafka’ -> ‘kafka_2.12-2.6.0’
# 创建用户及授权
(venv392) root@node01:~/kafka# ansible -i hosts 'kafka' -m shell -a "useradd kafka"
(venv392) root@node01:~/kafka# ansible -i hosts 'kafka' -m shell -a "chown -R kafka:kafka /data/apps/kafka*"
kafka配置文件:
[root@node01 config]# egrep -v "(^$|^#)" server.properties
broker.id=1
listeners=PLAINTEXT://node01:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/apps/kafka/logs
num.partitions=3
log.retention.hours=60
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=18000
auto.create.topics.enable=true
delete.topic.enable=true
[root@node02 config]# egrep -v "(^$|^#)" server.properties
broker.id=2
listeners=PLAINTEXT://node02:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/apps/kafka/logs
num.partitions=3
log.retention.hours=60
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=18000
auto.create.topics.enable=true
delete.topic.enable=true
[root@node03 config]# egrep -v "(^$|^#)" server.properties
broker.id=3
listeners=PLAINTEXT://node03:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/apps/kafka/logs
num.partitions=3
log.retention.hours=60
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=18000
auto.create.topics.enable=true
delete.topic.enable=true
| 配置项 | 含义 |
|---|---|
| broker.id | 每一个broker在集群中的唯一表示,要求是正数。每个节点不能相同,这里有三个节点, 即用1、2、3分别表示,当该服务器的IP地址发生改变时,若broker.id没有变化,则不会影响consumers的消息情况 |
| listeners | 设置Kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址。如果设置为主机名,那么还需要将主机名与IP的对应关系本地解析到系统的 /etc/hosts文件中或 DNS 中 |
| log.dirs | 这个参数用于配置Kafka保存数据的位置,Kafka中所有的消息都会存在这个目录下,可以通过逗号来指定多个路径,Kafka会根据最少被使用的原则选择目录分配新的 Parition。需要注意的是,Kafka在分配Partition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的Partition的个数多小而定 |
| num.partitions | 这个参数用于设置新创建的Topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能,我们这里配置了 3 个 |
| log.retention.hours | 这个参数用于配置Kafka中消息保存的时间,还支持log.retention.minutes和 log.retention.ms配置项,这三个参数都会控制删除过期数据的时间。如果多个同时设置,那么会优先选择时间单位最小的那个 |
| log.segment.bytes | 配置Partition中每个Segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的Segment File |
| zookeeper.connect | 这个参数用于指定ZooKeeper所在的地址,它存储了 Broker 的元信息,该值可以通过 逗号设置多个值,每个值的格式均为 hostname:port/path,其中每个部分的含义如下:1) hostname 表示 ZooKeeper 服务器的主机名或者IP地址,这里设置为 IP 地址;2) port 表示 ZooKeeper 服务器监听连接的端口号;3) path 表示 Kafka 在 ZooKeeper 上的根目录,如果不设置,会使用根目录 |
| auto.create.topics.enable | 这个参数用于设置是否自动创建 Topic,如果请求一个 Topic 时发现还没有创建,Kafka 会在 Broker 上自动创建一个Topic;如果需要严格的控制 Topic 的创建,那么可以设置 auto.create.topics.enable 为 false 禁止自动创建 Topic。一般公有云上是要手工创建 Topic 后才可以使用 |
| delete.topic.enable | 在 0.8.2 版本之后,Kafka 提供了删除 Topic 的功能,但是默认并不会直接将Topic 数据物理删除。如果要从物理上删除(即删除 Topic 后,数据文件也会一同删除),就需要设置此配置项为 true |
· log.dirs:这是非常重要的参数,指定了 Broker 需要使用的若干个文件目录路径。要知道这个参数是没有默认值的,这说明什么?
这说明它必须由你亲自指定。在线上生产环境中一定要为log.dirs配置多个路径,具体格式是一个 CSV 格式,也就是用逗号分隔的多个路径,比如/home/kafka1,/home/kafka2,/home/kafka3这样。如果有条件的话你最好保证这些目录挂载到不同的物理磁盘上。
· log.retention.hours=168 # 168 小时,默认为一周
· log.segment.bytes=1073741824 # 默认为 1GB
· log.retention.check.interval.ms=300000 # 默认 5 分钟
时间单位越小的优先级越高。
注意,在其他两个节点上,broker.id 须要修改,Kafka 集群中 broker.id 不能有相同的。
export KAFKA_HOME=/data/apps/kafka
[root@node01 kafka]# $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@node01 kafka]# jps
3940 QuorumPeerMain
5045 Jps
4653 Kafka
[root@node02 kafka]# $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@node02 kafka]# jps
3857 Kafka
4246 Jps
3719 QuorumPeerMain
[root@node03 kafka]# $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[root@node03 kafka]# jps
3296 Kafka
3664 Jps
3156 QuorumPeerMain
关闭 Kafka 及前后台启动 Kafka: # 关闭 Kafka
$KAFKA_HOME/bin/kafka-server-stop.sh
# 前台启动 Kafka
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
# 后台启动 Kafka
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
很多人都在使用前台启动方式,然后使用 nohup 放后台执行。为啥不直接使用 **-daemon**
选项呢?
Kafka 基本操作
创建 Topic
root@node01:~# $KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--create --partitions 3 --replication-factor 1 \
--topic getting-started
--bootstrap-server选项支持一个或多个 brokers 节点,使用逗号进行分割即可。创建完成,会有如下输出:
Created topic getting-started.
--replication-factor选项设置的不合理。我们 Kafka 集群有三个节点,而我们的副本数却设置了 1。这样是不是会存在丢失数据的风险呢?答案是肯定的。
(venv392) root@node01:~/kafka# $KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--create \
--partitions 3 \
--replication-factor 4 \
--topic an-error-topic
将会得到如下报错:
Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
[2022-06-01 11:35:51,914] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 4 larger than available brokers: 3.
(kafka.admin.TopicCommand$)
那到底设置副本数多大为最合适?
发布记录
$KAFKA_HOME/bin/kafka-console-producer.sh \
--broker-list node01:9092,node02:9092,node03:9092 \
--topic getting-started \
--property "parse.key=true" \
--property "key.separator=:"
key.separator属性指定。进行如下输入:
foo:first message
foo:second message
bar:first message
foo:third message
bar:second message
CTRL+D组合键进行结束。我们可以在下面的图形化界面上看到相应的信息:
消费记录
root@node01:~# $KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic getting-started \
--group cli-consumer \
--from-beginning \
--property "print.key=true" \
--property "key.separator=:"
终端会有如下输出:
bar:first message
bar:second message
foo:first message
foo:second message
foo:third message
查看 Topic 列表
root@node01:~# $KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--list
__consumer_offsets
demo_kafka_topic_1
getting-started
--exclude-internal选项从查询结果中排除了内部主题(例如
__consumer_offsets)。
root@node01:~# $KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--list --exclude-internal
demo_kafka_topic_1
getting-starte
查看 Topic 详细信息
root@node01:~# $KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--describe --topic getting-started
Topic: getting-started PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: getting-started Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: getting-started Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: getting-started Partition: 2 Leader: 3 Replicas: 3 Isr: 3
删除 Topic
root@node01:~# $KAFKA_HOME/bin/kafka-topics.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--topic getting-started --delete
查看消费组列表
root@node01:~# $KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--list
ssl-test-consumer-group
python_ssl_consumer
efak.system.group
查看消费组信息
root@node01:~# $KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--group cli-consumer \
--describe \
--all-topics
输出如下:
# 当还有活动成员时
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
cli-consumer getting-started 0 2 2 0 consumer-cli-consumer-1-f96d0081-f3d0-4766-beb3-7bee1135cb45 /192.168.110.99 consumer-cli-consumer-1
cli-consumer getting-started 1 0 0 0 consumer-cli-consumer-1-f96d0081-f3d0-4766-beb3-7bee1135cb45 /192.168.110.99 consumer-cli-consumer-1
cli-consumer getting-started 2 3 3 0 consumer-cli-consumer-1-f96d0081-f3d0-4766-beb3-7bee1135cb45 /192.168.110.99 consumer-cli-consumer-1
## 当没有活动成员时
Consumer group 'cli-consumer' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
cli-consumer getting-started 1 0 0 0 - - -
cli-consumer getting-started 0 2 2 0 - - -
cli-consumer getting-started 2 3 3 0 - - -
(venv392) root@node01:~/kafka# $KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--describe \
--all-groups \
--all-topics
Consumer group 'efak.system.group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
efak.system.group python_ssl_topic_name 0 4 4 0 - - -
efak.system.group python_ssl_topic_name 1 13 13 0 - - -
efak.system.group getting-started 0 412 412 0 - - -
efak.system.group python_ssl_topic_name 2 1 1 0 - - -
efak.system.group test_producer_perf 0 3322651 3322651 0 - - -
efak.system.group ssl_test 0 2 4 2 - - -
efak.system.group test_producer_perf 1 3333724 3333724 0 - - -
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
python_ssl_consumer python_ssl_topic_name 0 4 4 0 kafka-python-2.0.2-2ea867d2-b94c-474b-a1ca-7e88410f85fd /192.168.110.99 kafka-python-2.0.2
python_ssl_consumer python_ssl_topic_name 1 13 13 0 kafka-python-2.0.2-2ea867d2-b94c-474b-a1ca-7e88410f85fd /192.168.110.99 kafka-python-2.0.2
python_ssl_consumer python_ssl_topic_name 2 1 1 0 kafka-python-2.0.2-4f462ddf-d99e-4c29-98fe-ea0a13e3f417 /192.168.110.99 kafka-python-2.0.2
Consumer group 'ssl-test-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ssl-test-consumer-group ssl_test 2 5 5 0 - - -
ssl-test-consumer-group ssl_test 1 4 4 0 - - -
ssl-test-consumer-group ssl_test 0 4 4 0 - - -
删除消费组
(venv392) root@node01:~/kafka# $KAFKA_HOME/bin/kafka-consumer-groups.sh \
--bootstrap-server node01:9092,node02:9092,node03:9092 \
--group cli-consumer --delete
Kafka 监控
Consumer Group和
Topic数量也会随之增加,此时如果再使用
Kafka提供的命令行工具的话,就可能会出现力不从心的感觉。
Kafka监控系统,目前企业使用比较多的有
Kafka Manager、
Kafka Eagle等。
Kafka Eagle
Kafka Eagle 介绍
Kafka Eagle相比
Kafka Manager要简单、好用些,所以这里将重点介绍
KafkaEagle这款可视化管理工具。
Kafka集群的可视化工具,它可以支持管理多个
Kafka集群,还可以管理
Kafka的
Topic(查看、删除、创建等),也可以对消费者状态进行监控,并可实现消息阻塞告警、集群健康状态检测等功能。官网地址为:链接(https://www.kafka-eagle.org/)
root@node04:~# tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/
root@node04:~# cd /opt/kafka-eagle-bin-2.1.0/
root@node04:/opt/kafka-eagle-bin-2.1.0# tar -xf efak-web-2.1.0-bin.tar.gz -C /opt
root@node04:/opt/kafka-eagle-bin-2.1.0# cd /opt
root@node04:/opt# ln -sv efak-web-2.1.0 efak
/etc/profile文件中:
export KE_HOME=/opt/efak
export PATH=$PATH:$KE_HOME/bin
Kafka Eagle的配置文件位于
$KE_HOME/conf/目录下,配置文件为
system-config.properties,配置完成的文件内容如下:
root@node04:/opt/efak/conf# egrep -v "(^#|^$)" system-config.properties
efak.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
cluster1.efak.broker.size=20
kafka.zk.limit.size=16
efak.webui.port=8048
efak.distributed.enable=false
efak.cluster.mode.status=master
efak.worknode.master.host=localhost
efak.worknode.port=8085
cluster1.efak.jmx.acl=false
cluster1.efak.jmx.user=keadmin
cluster1.efak.jmx.password=keadmin123
cluster1.efak.jmx.ssl=false
cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore
cluster1.efak.jmx.truststore.password=ke123456
cluster1.efak.offset.storage=kafka
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
efak.metrics.charts=true
efak.metrics.retain=15
efak.sql.topic.records.max=5000
efak.sql.topic.preview.records.max=10
efak.topic.token=keadmin
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.efak.sasl.client.id=
cluster1.efak.blacklist.topics=
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=
# 使用的数据库驱动,这里我们为了方便,使用了 SQLite
efak.driver=org.sqlite.JDBC
efak.url=jdbc:sqlite:/opt/kafka-eagle/db/ke.db
efak.username=root
efak.password=www.kafka-eagle.org
mysql> create database ke character set utf8;
mysql> create user 'kafka'@'localhost' identified by '123456';
mysql> grant all privileges on ke.* to 'kafka'@'localhost';
mysql> flush privileges;
######################################
# kafka mysql jdbc driver address
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
· kafka.eagle.zk.cluster.alias:用来指定需要配置的 Kafka 集群名称,可以配置多个,用逗号分隔。
cluster1.zk.list:用来配置多个 Kafka 集群所对应的 ZooKeeper 集群列表。注意这个写法,这是配置 cluster1 集群,如果有多个集群,依次填写每个 Kafka 集群对应的 ZooKeeper 集群列表即可。
kafka.zk.limit.size:设置 ZooKeeper 客户端最大连接数。
· kafka.eagle.webui.port:设置 Kafka Eagle 的 Web 访问端口,默认是 8048。
· cluster1.kafka.eagle.offset.storage:设置存储消费信息的类型,一般在 Kafka 0.9 版本之前,消费信息会默认存储在 ZooKeeper 中,所以存储类型设置 zookeeper 即可。如果你使用的是 Kafka 0.10 之后的版本,那么,消费者信息默认存储在 Kafka 中,所以存储类型需要设置为 kafka。同时,在使用消费者 API 时,尽量保证客户端 Kafka API 版本和 Kafka 服务端的版本一致。
· efak.metrics.charts:设置是否开启 Kafka Eagle 的监控趋势图,默认是不启用方式,如果需要查看 Kafka 监控趋势图,则需要设置为 true。
· kafka.eagle.metrics.retain:设置数据默认保留时间,这里的 15 表示 15 天。
· kafka.eagle.sql.fix.error:在使用 KSQL 查询 Topic 时,如果遇到错误,可以开启这个属性,默认不开启。
· kafka.eagle.sql.topic.records.max:KSQL 查询 Topic 数据默认是最新的 5000 条,如果在使用 KSQL 查询的过程中出现异常,可以将 kafka.eagle.sql.fix.error 的值设置为 true,Kafka Eagle 会在系统中自动修复错误。
· efak.topic.token:设置在删除 Kafka Topic 时的 Token 令牌,需要记住这个值。
· efak.driver:设置连接数据库的驱动信息。
· efak.url:设置 jdbc 连接 MySQL 数据库的地址。
· efak.username:设置连接到 MySQL 数据库的用户名。
· efak.password:设置连接到 MySQL 数据库的用户密码。
JMX 后面增加三个参数,示例如下:
-Dcom.sun.management.jmxremote.host=0.0.0.0 \
-Dcom.sun.management.jmxremote.local.only=false \
-Djava.rmi.server.hostname=[自定义名称]
[root@node03 kafka]# grep node03 bin/kafka-run-class.sh
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.host=0.0.0.0 -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=node03 "
-Dcom.sun.management.jmxremote.host=0.0.0.0 \
-Dcom.sun.management.jmxremote.local.only=false \
-Djava.rmi.server.hostname=node03
Kafka Eagle 使用
启动服务
[root@node04 opt]# cd $KE_HOME/bin
[root@node04 bin]# ./ke.sh start
[2022-05-09 21:24:37] INFO: Starting EFAK( Eagle For Apache Kafka ) environment check ...
created: META-INF/
inflated: META-INF/MANIFEST.MF
created: WEB-INF/
created: WEB-INF/classes/
created: WEB-INF/classes/messages/
...
inflated: WEB-INF/lib/kafka-eagle-common-2.0.1.jar
inflated: WEB-INF/lib/janino-3.0.11.jar
inflated: WEB-INF/lib/spring-beans-5.2.0.RELEASE.jar
inflated: WEB-INF/lib/aspectjrt-1.8.10.jar
inflated: WEB-INF/lib/j2objc-annotations-1.3.jar
inflated: WEB-INF/lib/kafka-clients-2.0.0.jar
inflated: WEB-INF/lib/commons-beanutils-1.9.4.jar
inflated: WEB-INF/lib/kafka-eagle-plugin-2.0.1.jar
inflated: media/css/bscreen/font/DS-DIGIT.TTF
inflated: media/css/img/glyphicons-halflings.png
inflated: media/css/public/images/ui-icons_888888_256x240.png
...
[2022-05-09 21:24:49] INFO: Port Progress: [##################################################] | 100%
[2022-05-09 21:24:53] INFO: Config Progress: [##################################################] | 100%
[2022-05-09 21:24:56] INFO: Startup Progress: [##################################################] | 100%
[2022-05-09 21:24:37] INFO: Status Code[0]
[2022-05-09 21:24:37] INFO: [Job done!]
Welcome to
______ ______ ___ __ __
/ ____/ / ____/ / | / //_/
/ __/ / /_ / /| | / ,<
/ /___ / __/ / ___ | / /| |
/_____/ /_/ /_/ |_|/_/ |_|
( Eagle For Apache Kafka® )
Version 2.1.0 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.110.158:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
[root@node04 bin]# jps
6353 KafkaEagle
6430 Jps
$KE_HOME/bin/ke.sh
支持的命令选项:
无法复制加载中的内容

操作界面


Cluster Info:

Metrics 监控
我们查看整个集群的一些监控信息,URL 为:http://192.168.110.158:8048/cluster/info,

配置 Kafka
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999" # 增加此行
fi
[root@node03 kafka]# lsof -i:9999
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 27191 root 106u IPv6 56551 0t0 TCP *:distinct (LISTEN)

配置 Zookeeper
3.5+版本,我们需要做如下修改。修改
$ZK_HOME/bin/zkServer.sh脚本,大约在 77 行的位置,增加如下配置:
vi zkServer.sh
# 增加下面一行
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

再次查看 Metrics 信息


报警
配置报警

Kafka 节点监控,配置如下:

这里就是我们刚刚配置的两个告警,

触发报警
getting-started的 Topic,其容量大于
102400字节时就报警):



报警如果恢复的话,会有如下恢复的提示:


· Producer 的监控
· Consumer 的监控
· Zookeeper 的监控
推荐使用 Kafka Eagle 来运维管理及监控 Kafka 集群。简单、好用、符合国人的口味。
Kafka 安全
· 身份验证
· 权限控制
默认状态下的 Kafka

任何客户端可以连接 Zookeeper 或 Kafka 节点: 任何人都可以连接这两个常规端口 2181(ZooKeeper)和 9092(Kafka)。ZooKeeper 对于潜在的攻击者来说是一个非常易受攻击的点,因为写入 znodes(用于在ZooKeeper中持久保存状态的内部数据结构)将轻而易举地破坏整个集群。 连接 Kafka brokers 是非加密的: 连接通过TCP建立,各方通过二进制协议交换数据。有权访问传输网络的第三方可以使用基本的数据包嗅探器来捕获和分析网络流量,从而获得对信息的未经授权的访问。恶意执行组件可能会更改传输中的信息,或者模拟代理或客户端。除了客户端流量之外,默认情况下,跨行流量也是不安全的。 连接 Kafka brokers 是未经身份验证的: 任何客户端都可以与 borker 建立连接,客户端不必标识自己,也不必证明所提供的身份是真实的。 没有授权控制措施: 即使启用了身份验证,客户端也可以对 borker 执行任何操作。默认授权策略是 allow-all,让恶意客户端疯狂运行。
理想状态下的 Kafka

配置 SSL

客户端到 Broker 端加密
1、为每个 Broker 节点创建秘钥库
2、创建私有证书 CA
3、对证书进行签名、导入 CA 和证书到密钥库
4、配置服务端与客户端
root@node01:~# DNAME="CN=Tyun Tech, OU=tyun.cn, O=tyun.cn, L=Shanghai, ST=Shanghai, C=CN"
root@node01:~# PASSWORD=changeit12345
root@node01:~# KEY_PASSWORD=$PASSWORD
root@node01:~# STORE_PASSWORD=$PASSWORD
root@node01:~# TRUST_KEY_PASSWORD=$PASSWORD
root@node01:~# TRUST_STORE_PASSWORD=$PASSWORD
# 创建服务端用的秘钥
root@node01:~# keytool \
-keystore $KEY_STORE \
-alias kafka-server \
-validity $DAYS_VALID \
-genkey -keyalg RSA \
-storepass $STORE_PASSWORD \
-keypass $KEY_PASSWORD -dname "$DNAME"
# 创建客户端用的秘钥
root@node01:~# keytool \
-keystore $CLIENT_KEY_STORE \
-alias kafka-client \
-validity $DAYS_VALID \
-genkey -keyalg RSA \
-storepass $STORE_PASSWORD \
-keypass $KEY_PASSWORD -dname "$DNAME"
(venv392) root@node01:~/mycerts/certificates# echo $PASSWORD |keytool -list -v -keystore server.keystore.jks |head -20
Enter keystore password:
***************** WARNING WARNING WARNING *****************
* The integrity of the information stored in your keystore *
* has NOT been verified! In order to verify its integrity, *
* you must provide your keystore password. *
***************** WARNING WARNING WARNING *****************
Keystore type: jks
Keystore provider: SUN
Your keystore contains 2 entries
Alias name: caroot
Creation date: May 30, 2022
Entry type: trustedCertEntry
Owner: OU="tyun.cn,CN=Tyun Tech", O=tyun.cn, L=Shanghai, ST=Shanghai, C=CN
Issuer: OU="tyun.cn,CN=Tyun Tech", O=tyun.cn, L=Shanghai, ST=Shanghai, C=CN
Serial number: 1b75081359742c8f40980f4450776c56c9fffc6b
Valid from: Mon May 30 15:19:50 CST 2022 until: Tue May 30 15:19:50 CST 2023
Certificate fingerprints:
SHA1: B1:BA:9F:10:83:53:42:EA:4C:17:B8:C4:A3:3B:4F:98:72:AB:07:63
SHA256: F3:F2:3F:B7:04:A4:F5:3F:2F:DD:DB:D3:1E:64:D2:20:76:92:DD:47:F4:12:45:4D:10:9B:71:B0:10:25:0E:A8
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3
(venv392) root@node01:~/mycerts/certificates# echo $PASSWORD |keytool -list -v -keystore client.keystore.jks |head -20
创建 CA
root@node01:~# openssl req \
-new -x509 \
-keyout $CERT_OUTPUT_PATH/ca-key \
-out "$CERT_AUTH_FILE" \
-days "$DAYS_VALID" \
-passin pass:"$PASSWORD" \
-passout pass:"$PASSWORD" \
-subj "/C=CN/ST=Shanghai/L=Shanghai/O=tyun.cn/OU=tyun.cn,CN=Tyun Tech"
root@node01:~# ll ca-*
-rw-r--r-- 1 root root 1363 May 30 14:19 ca-cert
-rw------- 1 root root 1854 May 30 14:18 ca-key
# 添加 CA 文件到 broker truststore
root@node01:~# keytool -keystore "$TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" \
-storepass "$TRUST_STORE_PASSWORD" \
-keypass "$TRUST_KEY_PASS" -noprompt
# 添加 CA 文件到 client truststore
root@node01:~# keytool \
-keystore "$CLIENT_TRUST_STORE" \
-alias CARoot \
-importcert -file "$CERT_AUTH_FILE" \
-storepass "$TRUST_STORE_PASSWORD" \
-keypass "$TRUST_KEY_PASS" -noprompt
root@node01:~# keytool \
-keystore "$KEY_STORE" \
-alias kafka-server \
-certreq -file "$CERT_OUTPUT_PATH/server-cert-file" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
root@node01:~# keytool \
-keystore "$CLIENT_KEY_STORE" \
-alias kafka-client \
-certreq -file "$CERT_OUTPUT_PATH/client-cert-file" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
root@node01:~# openssl x509 \
-req -CA "$CERT_AUTH_FILE" \
-CAkey $CERT_OUTPUT_PATH/ca-key \
-in "$CERT_OUTPUT_PATH/server-cert-file" \
-out "$CERT_OUTPUT_PATH/server-cert-signed" \
-days "$DAYS_VALID" \
-CAcreateserial -passin pass:"$PASSWORD"
root@node01:~# openssl x509 \
-req -CA "$CERT_AUTH_FILE" \
-CAkey $CERT_OUTPUT_PATH/ca-key \
-in "$CERT_OUTPUT_PATH/client-cert-file" \
-out "$CERT_OUTPUT_PATH/client-cert-signed" \
-days "$DAYS_VALID" \
-CAcreateserial -passin pass:"$PASSWORD"
root@node01:~# keytool \
-keystore "$KEY_STORE" \
-alias CARoot \
-import -file "$CERT_AUTH_FILE" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
root@node01:~# keytool \
-keystore "$CLIENT_KEY_STORE" \
-alias CARoot \
-import -file "$CERT_AUTH_FILE" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
root@node01:~# keytool \
-keystore "$KEY_STORE" \
-alias kafka-server \
-import -file "$CERT_OUTPUT_PATH/server-cert-signed" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
root@node01:~# keytool \
-keystore "$CLIENT_KEY_STORE" \
-alias kafka-client \
-import -file "$CERT_OUTPUT_PATH/client-cert-signed" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
#!/bin/bash
BASE_DIR=/root/mycerts
CERT_OUTPUT_PATH="$BASE_DIR/certificates"
PASSWORD=changeit12345
KEY_STORE="$CERT_OUTPUT_PATH/server.keystore.jks"
TRUST_STORE="$CERT_OUTPUT_PATH/server.truststore.jks"
CLIENT_KEY_STORE="$CERT_OUTPUT_PATH/client.keystore.jks"
CLIENT_TRUST_STORE="$CERT_OUTPUT_PATH/client.truststore.jks"
KEY_PASSWORD=$PASSWORD
STORE_PASSWORD=$PASSWORD
TRUST_KEY_PASSWORD=$PASSWORD
TRUST_STORE_PASSWORD=$PASSWORD
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"
DAYS_VALID=365
DNAME="CN=Tyun Tech, OU=tyun.cn, O=tyun.cn, L=Shanghai, ST=Shanghai, C=CN"
mkdir -p $CERT_OUTPUT_PATH
echo "1. 产生 key 和证书......"
keytool \
-keystore $KEY_STORE \
-alias kafka-server \
-validity $DAYS_VALID \
-genkey -keyalg RSA \
-storepass $STORE_PASSWORD \
-keypass $KEY_PASSWORD -dname "$DNAME"
keytool \
-keystore $CLIENT_KEY_STORE \
-alias kafka-client \
-validity $DAYS_VALID \
-genkey -keyalg RSA \
-storepass $STORE_PASSWORD \
-keypass $KEY_PASSWORD -dname "$DNAME"
echo "2. 创建 CA......"
openssl req \
-new -x509 \
-keyout $CERT_OUTPUT_PATH/ca-key \
-out "$CERT_AUTH_FILE" \
-days "$DAYS_VALID" \
-passin pass:"$PASSWORD" \
-passout pass:"$PASSWORD" \
-subj "/C=CN/ST=Shanghai/L=Shanghai/O=tyun.cn/OU=tyun.cn,CN=Tyun Tech"
echo "3. 添加 CA 文件到 broker truststore......"
keytool -keystore "$TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" \
-storepass "$TRUST_STORE_PASSWORD" \
-keypass "$TRUST_KEY_PASS" -noprompt
echo "4. 添加 CA 文件到 client truststore......"
keytool \
-keystore "$CLIENT_TRUST_STORE" \
-alias CARoot \
-importcert -file "$CERT_AUTH_FILE" \
-storepass "$TRUST_STORE_PASSWORD" \
-keypass "$TRUST_KEY_PASS" -noprompt
echo "5. 从 keystore 中导出集群证书......"
keytool \
-keystore "$KEY_STORE" \
-alias kafka-server \
-certreq -file "$CERT_OUTPUT_PATH/server-cert-file" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
keytool \
-keystore "$CLIENT_KEY_STORE" \
-alias kafka-client \
-certreq -file "$CERT_OUTPUT_PATH/client-cert-file" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
echo "6. 使用 CA 签发证书......"
openssl x509 \
-req -CA "$CERT_AUTH_FILE" \
-CAkey $CERT_OUTPUT_PATH/ca-key \
-in "$CERT_OUTPUT_PATH/server-cert-file" \
-out "$CERT_OUTPUT_PATH/server-cert-signed" \
-days "$DAYS_VALID" \
-CAcreateserial -passin pass:"$PASSWORD"
openssl x509 \
-req -CA "$CERT_AUTH_FILE" \
-CAkey $CERT_OUTPUT_PATH/ca-key \
-in "$CERT_OUTPUT_PATH/client-cert-file" \
-out "$CERT_OUTPUT_PATH/client-cert-signed" \
-days "$DAYS_VALID" \
-CAcreateserial -passin pass:"$PASSWORD"
echo "7. 导入 CA 文件到 keystore......"
keytool \
-keystore "$KEY_STORE" \
-alias CARoot \
-import -file "$CERT_AUTH_FILE" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
keytool \
-keystore "$CLIENT_KEY_STORE" \
-alias CARoot \
-import -file "$CERT_AUTH_FILE" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
echo "8. 导入已签发证书到 keystore......"
keytool \
-keystore "$KEY_STORE" \
-alias kafka-server \
-import -file "$CERT_OUTPUT_PATH/server-cert-signed" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
keytool \
-keystore "$CLIENT_KEY_STORE" \
-alias kafka-client \
-import -file "$CERT_OUTPUT_PATH/client-cert-signed" \
-storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt
部署私钥及已签名证书到 Broker
root@node01:~/mycerts/certificates# pwd
/root/mycerts/certificates
root@node01:~/mycerts/certificates# ll
total 32
-rw-r--r-- 1 root root 1330 May 30 15:19 ca-cert
-rw------- 1 root root 1854 May 30 15:19 ca-key
-rw-r--r-- 1 root root 4140 May 30 15:19 client.keystore.jks
-rw-r--r-- 1 root root 1004 May 30 15:19 client.truststore.jks
-rw-r--r-- 1 root root 4139 May 30 15:19 server.keystore.jks
-rw-r--r-- 1 root root 1004 May 30 15:19 server.truststore.jks
(venv392) root@node01:~/mycerts/certificates# ansible \
-i /root/kafka/hosts 'kafka' \
-m copy -a 'src=server.keystore.jks dest=/data/apps/kafka/config/'
(venv392) root@node01:~/mycerts/certificates# ansible \
-i /root/kafka/hosts 'kafka' \
-m copy -a 'src=server.truststore.jks dest=/data/apps/kafka/config/'
# 验证证书是否复制成功
(venv392) root@node01:~/mycerts/certificates# ansible \
-i /root/kafka/hosts 'kafka' \
-m shell -a "ls -l /data/apps/kafka/config/*.jks"
node01 | CHANGED | rc=0 >>
-rw-r--r-- 1 root root 4139 May 30 15:22 server.keystore.jks
-rw-r--r-- 1 root root 1004 May 30 15:22 server.truststore.jks
node02 | CHANGED | rc=0 >>
-rw-r--r-- 1 root root 4139 May 30 15:26 server.keystore.jks
-rw-r--r-- 1 root root 1004 May 30 15:28 server.truststore.jks
node03 | CHANGED | rc=0 >>
-rw-r--r-- 1 root root 4139 May 30 15:26 erver.keystore.jks
-rw-r--r-- 1 root root 1004 May 30 15:28 server.truststore.jks
(venv392) root@node01:~# $KAFKA_HOME/bin/kafka-server-stop.s
(venv392) root@node01:~# $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[2022-05-30 16:25:53,362] INFO [SocketServer brokerId=3] Failed authentication with /192.168.110.197 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2022-05-30 16:25:53,492] INFO [Controller id=3, targetBrokerId=3] Failed authentication with node03/192.168.110.197 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2022-05-30 16:25:53,493] ERROR [Controller id=3, targetBrokerId=3] Connection to node 3 (node03/192.168.110.197:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
ssl.endpoint.identification.algorithm=
(venv392) root@node01:~/kafka# cat ssl_producer.properties
security.protocol=SSL
ssl.truststore.location=/root/mycerts/certificates/client.truststore.jks
ssl.truststore.password=changeit12345
ssl.keystore.location=/root/mycerts/certificates/server.keystore.jks
ssl.keystore.password=changeit12345
ssl.key.password=changeit12345
ssl.endpoint.identification.algorithm=
(venv392) root@node01:~/kafka# $KAFKA_HOME/bin/kafka-console-producer.sh \
--bootstrap-server=node01:9093,node02:9093,node03:9093 \
--topic ssl_test \
--producer.config ./ssl_p.properties
>hello
>hello
>hello
>world
>world
ssl_testTopic 下面创建了 5 条消息,可以按
CTRL+D组合键结束。可以在 EFAK 的 WEB 界面查看该主题的信息:
(venv392) root@node01:~/kafka# cat ssl_consumer.properties
security.protocol=SSL
group.id=ssl-test-consumer-group
ssl.truststore.location=/root/mycerts/certificates/client.truststore.jks
ssl.truststore.password=changeit12345
ssl.keystore.location=/root/mycerts/certificates/server.keystore.jks
ssl.keystore.password=changeit12345
ssl.key.password=changeit12345
ssl.endpoint.identification.algorithm=
ssl_test主题下面的消息:
(venv392) root@node01:~/kafka# $KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server=node01:9093,node02:9093,node03:9093 \
--topic ssl_test \
--from-beginning \
--consumer.config ./ssl_consumer.properties
hello
hello
world
hello
world
public final class SslProducerSample {
public static void main(String[] args) throws InterruptedException {
final var topic = "getting-started";
final Map<String, Object> config =
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9093,node02:9093,node03:9093",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL",
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "client.truststore.jks",
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "changeit12345",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (var producer = new KafkaProducer<String, String>(config)) {
while (true) {
final var key = "myKey";
final var value = new Date().toString();
out.format("Publishing record with value %s%n",
value);
final Callback callback = (metadata, exception) -> {
out.format("Published with metadata: %s, error: %s%n",
metadata, exception);
};
// publish the record, handling the metadata in the callback
producer.send(new ProducerRecord<>(topic, key, value), callback);
// wait a second before publishing another
Thread.sleep(1000);
}
}
}
}
public final class SslConsumerSample {
public static void main(String[] args) {
final var topic = "getting-started";
final Map<String, Object> config =
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9093,node02:9093,node03:9093",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL",
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "https",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "client.truststore.jks",
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "changeit12345",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.GROUP_ID_CONFIG, "basic-consumer-sample",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
try (var consumer = new KafkaConsumer<String, String>(config)) {
consumer.subscribe(Set.of(topic));
while (true) {
final var records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
out.format("Got record with value %s%n", record.value());
}
consumer.commitAsync();
}
}
}
}
Broker 端加密
server.properties,如下:
inter.broker.listener.name=SSL
root@node01:~# $KAFKA_HOME/bin/kafka-server-stop.sh
root@node01:~# $KAFKA_HOME/bin/kafka-server-start.sh \
-daemon $KAFKA_HOME/config/server.properties
root@node01:~# netstat -an | egrep "9092|9093"
tcp6 0 0 :::9092 :::* LISTEN
tcp6 0 0 :::9093 :::* LISTEN
tcp6 0 0 192.168.110.99:9093 192.168.110.197:44040 ESTABLISHED
tcp6 0 0 192.168.110.99:9093 192.168.110.211:54720 ESTABLISHED
tcp6 0 0 192.168.110.99:38244 192.168.110.197:9093 ESTABLISHED
tcp6 0 0 192.168.110.99:60214 192.168.110.211:9093 ESTABLISHED
Broker 端到 Zookeeper 端加密
Zookeeper 的 SSL 配置
root@node01:/data/apps/zookeeper-3.6.1/conf# cat zoo.cfg
tickTime=2000
initLimit=20
syncLimit=10
quorumListenOnAllIPs=true
dataDir=/data/apps/zookeeper-3.6.1/data
dataLogDir=/data/apps/zookeeper-3.6.1/logs
secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
ssl.keyStore.location=/data/apps/zookeeper-3.6.1/conf/zkserver.keystore.jks
ssl.keyStore.password=changeit12345
ssl.trustStore.location=/data/apps/zookeeper-3.6.1/conf/zkserver.truststore.jks
ssl.trustStore.password=changeit12345
server.1=node01.tyun.cn:2888:3888
server.2=node02.tyun.cn:2888:3888
server.3=node03.tyun.cn:2888:3888
Kafka 连接 Zookeeper 的 SSL 配置
root@node01:/data/apps/kafka/config# cat server.properties
......
# broker to zookeeper use ssl
zookeeper.connect=node01.tyun.cn:2182,node02.tyun.cn:2182,node03.tyun.cn:2182
zookeeper.connection.timeout.ms=18000
zookeeper.ssl.client.enable=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.endpoint.identification.algorithm=false
zookeeper.ssl.hostnameVerification=false
zookeeper.ssl.quorum.hostnameVerification=false
zookeeper.ssl.keystore.location=/data/apps/kafka/config/zkclient.keystore.jks
zookeeper.ssl.keystore.password=changeit12345
zookeeper.ssl.truststore.location=/data/apps/kafka/config/zkclient.truststore.jks
zookeeper.ssl.truststore.password=changeit12345
创建消息及验证 SSL 通信协议
# 生产记录
root@node01:~/kafka# export KAFKA_HOME=/data/apps/kafka
root@node01:~/kafka# $KAFKA_HOME/bin/kafka-console-producer.sh \
--bootstrap-server node01.tyun.cn:9093,node02.tyun.cn:9093,node03.tyun.cn:9093 \
--topic ssl_test \
--producer.config /root/kafka/ssl_producer.properties
>hello world
>hahh hhhhh
# 消费记录
root@node01:~/kafka# $KAFKA_HOME/bin/kafka-console-consumer.sh \
--bootstrap-server node01.tyun.cn:9093,node02.tyun.cn:9093,node03.tyun.cn:9093 \
--topic ssl_test \
--from-beginning \
--consumer.config /root/kafka/ssl_consumer.properties
hello world
hahh hhhhh
Kafka MirrorMaker

附录
图形化及监控工具推荐
KafDrop
root@node04:~# /opt/jdk-11.0.9/bin/java -jar kafdrop-3.30.0.jar \
--kafka.brokerConnect=node01:9092,node02:9092,node03:9092

logiKM
Kafka Manager
root@node04:~# wget -c https://github.com/yahoo/CMAK/releases/download/3.0.0.6/cmak-3.0.0.6.zip
root@node04:~# unzip cmak-3.0.0.6.zip -d /opt/
# 配置 CMAK
root@node04:/opt/cmak# grep cmak.zkhosts conf/application.conf
cmak.zkhosts="node01:2181,node02:2181,node03:2181"
cmak.zkhosts=${?ZK_HOSTS}
# 如果不想使用硬编码的形式,可以使用环境变量
ZK_HOSTS="node01:2181,node02:2181,node03:2181"
# 启动服务,默认端口是 9000
root@node04:/opt/cmak# /opt/cmak/bin/cmak
# 可以指定端口,如 8080
root@node04:/opt/cmak# /opt/cmak/bin/cmak \
-Dconfig.file=/opt/cmak/application.conf \
-Dhttp.port=8080
# 可以指定 JDK 版本
root@node04:/opt/cmak# /opt/cmak/bin/cmak \
-java-home /opt/jdk-11.0.9


参考文献
Effective Kafka Docs (kafka-eagle.org) KafkaProducer ‒ kafka-python 2.0.2-dev documentation IBM Docs
推荐资料
· 概览& 成长地图分布式消息服务Kafka版华为云
很多云厂商的帮助文档,也是不错的学习资料。

点👇分享

戳👇在看






