
因此,在生产环境中,这种认证方式不符合实际业务场景,不利于后期扩展。然而使用SCRAM认证,可以动态新增用户,添加用户后,可以不用重启正在运行的Kafka集群即可进行鉴权。所以生产环境推荐使用SCRAM+PLAIN搭配的认证方案。
[root@master2 config]# vim /usr/local/zookeeper-3.4.14/conf/zoo.cfg
tickTime=2000
initLimit=1
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/datalog
clientPort=2181
admin.serverPort=8888
maxClientCnxns=3000
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=192.xxx.xxx.112:2888:3888
server.2=192.xxx.xxx.114:2888:3888
server.3=192.xxx.xxx.115:2888:3888
4lw.commands.whitelist=conf,stat,srvr,mntr
#zk SASL
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
[root@master2 config]# vim usr/local/zookeeper-3.4.14/conf/zk_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin123"
user_kafka="kafka123";
};
admin用户 是zk 集群之间使用的。 kafka用户 是 broker 与 zk 之间使用的。
vim /usr/local/zookeeper-3.4.14/bin/zkEnv.sh
ZOOBINDIR="${ZOOBINDIR:-/usr/bin}"
ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."
export SERVER_JVMFLAGS="-
Djava.security.auth.login.config=/usr/local/zookeeper-3.4.14/conf/zk_jaas.conf"
zkServer.sh restart|status
kafka-configs.sh --zookeeper 192.xxx.xxx.112:2181 --alter --
add-config 'SCRAM-SHA-256=
[iterations=8192,password=admin123],SCRAM-SHA-512=
[password=admin123]' --entity-type users --entity-name admin
kafka-configs.sh --zookeeper 192.xxx.xxx.112:2181 --describe --entity-type users
(可以单独指定某个用户 --entity-name producer,如下)
kafka-configs.sh --zookeeper 192.xxx.xxx.112:2181 --describe --entity-type users --entity-name producer
zkCli.sh -server 192.xxx.xxx.112:2181
[zk: 192.xxx.xxx.112:2181(CONNECTED) 0] ls /config/users
[admin, producer, consumer]
[zk: 192.xxx.xxx.112:2181(CONNECTED) 1]
kafka-configs.sh --zookeeper 192.xxx.xxx.112:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer
vim /usr/local/kafka_2.12-2.3.0/config/kafka_server_jaas.conf
KafkaServer {
#org.apache.kafka.common.security.plain.PlainLoginModule required
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin123"
user_admin="admin123"
user_producer="producer123"
user_consumer="consumer123";
};
KafkaClient {
#org.apache.kafka.common.security.plain.PlainLoginModule required
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin123"
user_producer="producer123"
user_consumer="consumer123";
};
Client {
#org.apache.kafka.common.security.plain.PlainLoginModule required
org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafka"
password="kafka123";
};
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username ="admin"
password="admin123"
user_admin="admin123"
user_producer="producer123"
user_consumer="consumer123";
};
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin123"
user_producer="producer123"
user_consumer="consumer123";
};
Client {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafka"
password="kafka123";
};
KafkaServer中usename配置的是kafka服务端使用的账号和密码,后面的user_xxx事预设的普通帐号认证信息。 中间部分配置的是PLAIN认证方式的账户和密码,其中producer1是账户名,producer123是密码。 Client配置了broker到Zookeeper的连接用户名密码,这里要和前面zookeeper配置中的 zk_jaas.conf.conf 中 user_kafka 的账号和密码相同。
1)不要忘记最后一行和倒数第二行结尾处的分号;
2)JAAS 文件中不需要任何空格键。
[root@master2 config]# cat server.properties
broker.id=0
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.xxx.xxx.112:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/datalogs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
auto.create.topics.enable=false
zookeeper.connect=192.xxx.xxx.112:2181,192.xxx.xxx.114:2181,192.xxx.xxx.115:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
[root@node1 config]# cat server.properties
broker.id=1
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.xxx.xxx.114:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/datalogs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
auto.create.topics.enable=false
zookeeper.connect=192.xxx.xxx.112:2181,192.xxx.xxx.114:2181,192.xxx.xxx.115:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
[root@node2 config]# cat server.properties
broker.id=2
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://192.xxx.xxx.115:9092
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/datalogs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
auto.create.topics.enable=false
zookeeper.connect=192.xxx.xxx.112:2181,192.xxx.xxx.114:2181,192.xxx.xxx.115:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
vim /usr/local/kafka_2.12-2.3.0/bin/kafka-server-start.sh
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.3.0/config/kafka_server_jaas.conf kafka.Kafka "$@"
ecurity.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
[root@master2 config]# cat producer.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="producer" password="producer123";
[root@master2 config]# vim consumer.conf
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.Scra
mLoginModule required username="consumer" password="consumer123";
[root@master2 kafka_2.12-2.3.0]# vim bin/kafka-console-producer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=
/usr/local/kafka_2.12-2.3.0/config/kafka_server_jaas.conf"exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
[root@master2 kafka_2.12-2.3.0]# vim bin/kafka-console-consumer.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.3.0/config/kafka_server_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
nohup kafka-server-start.sh /usr/local/kafka_2.12-2.3.0/config/server.properties &
zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 13] ls /brokers/ids
[0, 1, 2]
kafka-topics.sh --zookeeper 192.xxx.xxx.112:2181 --create --
replication-factor 3 --partitions 3 --topic test_topic
Created topic test_topic.
[root@node2 ~]# kafka-topics.sh --list --zookeeper 192.xxx.xxx.112:2181
test_topic
[root@node2 ~]# kafka-topics.sh --describe --zookeeper 192.xxx.xxx.112:2181 --topic test_topic
Topic:test_topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test_topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test_topic Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test_topic Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
分配写的权限
kafka-acls.sh --authorizer-properties
zookeeper.connect=192.xxx.xxx.114:2181 --add --allow-
principal User:producer --operation Write --topic 'test_topic'

启动生产者发送消息
[2022-12-18 16:48:18,155] WARN [Producer clientId=console-
producer] Error while fetching metadata with correlation id
3 : {test_topic=TOPIC_AUTHORIZATION_FAILED}
(org.apache.kafka.clients.NetworkClient)
[2022-12-18 16:48:18,156] ERROR [Producer clientId=console-
producer] Topic authorization failed for topics [test_topic]
(org.apache.kafka.clients.Metadata)
[2022-12-18 16:48:18,158] ERROR Error when sending message
to topic test_topic with key: null, value: 5 bytes with
error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
kafka-console-producer.sh --broker-list 192.xxx.xxx.114:9092 --topic test_topic --producer.config /usr/local/kafka_2.12-2.3.0/config/producer.conf
>hello
>world
>end
kafka-acls.sh --authorizer-properties
zookeeper.connect=192.xxx.xxx.114:2181 --add --allow-
principal User:consumer--operation Read --topic 'test_topic'

kafka-acls.sh --authorizer-properties
zookeeper.connect=192.xxx.xxx.114:2181 --add --allow-
principal User:consumer--consumer --topic test_topic --group test_group

[2022-12-18 17:48:59,996] WARN [Consumer clientId=consumer-1,
groupId=test_group] Error while fetching metadata with correlation id 2 : {test_topic=TOPIC_AUTHORIZATION_FAILED}
(org.apache.kafka.clients.NetworkClient)
[2022-12-18 17:49:00,015] ERROR [Consumer clientId=consumer-1,
groupId=test_group] Topic authorization failed for topics
[test_topic] (org.apache.kafka.clients.Metadata)
[2022-12-18 17:49:00,018] ERROR Error processing message,
terminating consumer process:
(kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_topic]
Processed a total of 0 messages
#kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.112:9092,192.xxx.xxx.114:9092,192.xxx.xxx.115:9092 --topic test --group test-group --from-beginning --consumer.config /usr/local/kafka_2.12-2.3.0/config/consumer.conf
kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.112:9092 --topic test_topic --group test-group --from-beginning --consumer.config /usr/local/kafka_2.12-2.3.0/config/consumer.conf
hello
world
end
kafka-topics.sh --zookeeper 192.xxx.xxx.115:2181 --create --
partitions 2 --replication-factor 1 --topic test1
#添加写权限
kafka-acls.sh --authorizer
kafka.security.auth.SimpleAclAuthorizer --authorizer-
properties zookeeper.connect=192.xxx.xxx.114:2181 --add --
allow-principal User:producer --operation Write --topic test1
#写入消息
kafka-console-producer.sh --broker-list
192.xxx.xxx.112:9092,192.xxx.xxx.114:9092,192.xxx.xxx.115:9092 --topic test1
--producer.config /usr/local/kafka_2.12-
2.3.0/config/producer.conf
>hello
>world
#添加读权限
kafka-acls.sh --authorizer
kafka.security.auth.SimpleAclAuthorizer --authorizer-
properties zookeeper.connect=192.xxx.xxx.114:2181 --add --
allow-principal User:consumer --operation Read --topic test1
#添加消费者组权限
kafka-acls.sh --authorizer
kafka.security.auth.SimpleAclAuthorizer --authorizer-
properties zookeeper.connect=192.xxx.xxx.114:2181 --add --
allow-principal User:consumer --operation Read --group test-group
#拉取消息
kafka-console-consumer.sh --bootstrap-server
192.xxx.xxx.112:9092,192.xxx.xxx.114:9092,192.xxx.xxx.115:9092
--topic test1 --from-beginning --consumer.config
/usr/local/kafka_2.12-2.3.0/config/consumer.conf --group test-group
hello
World

本文作者:事业二部(上海新炬中北团队)
本文来源:“IT那活儿”公众号

文章转载自IT那活儿,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




