背景
Kafka在0.9.0.0版本前没有安全机制功能。Kafka Client程序可以直接获取到Kafka集群元数据信息和Kafka Broker地址后,连接到Kafka集群,然后完全操作集群上的所有topic数据资源。另外集群节点间通讯、broker和zookeeper通讯、客户端和集群的网络层通信都是无加密模式。集群的数据存在极大的安全风险。
自0.9.0.0版本开始,Kafka社区逐步添加了较多功能用于提高Kafka群集的安全性。目前Kafka安全集群安全机制主要有三个方面的设置:通信加密(encryption)、身份认证(authentication)和授权(authorization)。
本文重点介绍生产安全集群的一种配置方案。数据通讯传输配置SSL,认证配置SASL,授权通过ACL接口命令来完成的,即:SSL+SASL/SCRAM+ACL。
第一部分 Kafka集群加密传输
1.1 背景知识介绍
涉及的技术知识不做详细介绍。
1.1.1 密码学基础
加密算法分为两类:
对称密钥算法(Symmetric Cryptography):数据加密和解密时使用相同的密钥。例如常用的DES就是对称加密算法。
非对称密钥算法(Asymmetric Cryptography):数据加密和解密时使用不同的密钥,分为:公开的公钥(public key)和用户保存的私钥(private key),私钥和公钥在数学上是相关的。利用公钥(或私钥)加密的数据只能用相应的私钥(或公钥)才能解密。举一个例子:客户在银行网银上做一笔交易,首先向银行申请公钥,银行分发公钥给用户,用户使用公钥对请求数据进行加密。银行收到加密数据后通过银行侧保存的私钥进行解密处理,并处理后更新后台数据库。这个通讯过程中银行不需要通过互联网分发私钥。因此保证了私钥的安全。目前最常用的非对称加密算法是RSA算法。
非对称密钥算法中,私钥来解密公钥加密的数据,公钥来解密私钥加密的数据。
两种加密算法的比较:
对称密钥的强度和密钥长度成正比,但是解密效率和密钥长度成反比。另外私钥的分发存在安全风险。
非对称加密保证了私钥的安全性,但是加密和解密的效率比对称加密低。
所以通常加密场景是两种密钥结合使用。使用数据主体使用对称秘钥算法,但是私钥的传输使用非对称算法在互联网环境分发非对称密钥。最常见的就是SSL/TLS。
1.1.2 CA数字证书
对于非对称密钥算法存在一个安全风险点,那就是公钥的分发存在中间人攻击。还是以客户和银行的通信为例(例子简单化处理)。客户和银行分别有自己的公钥和私钥,私钥各自保留本地。公钥通过互联网分发给对方。那么公钥就是有安全风险的。存在被黑客截取风险。客户向银行申请银行公钥,结果被黑客截取,黑客伪装成银行,返回给用户自己的黑客公钥,用户收到黑客公钥后,将信息加密发给黑客。黑客用黑客私钥进行解密,获取到真实信息。这时候黑客伪装成客户用相同的方法完成和银行的数据交互。这就是中间人攻击的案例。
所以非对称加密算法的公钥传输同样存在风险。当然如果使用原始的离线方式交换密钥是安全的,但是随着互联网通信的爆炸式增长,这是落后低效的。为了保证公钥的真实性和安全性,这时候我们引入第三个角色:公开密钥认证(Public key certificate,简称CA),又称数字证书(digital certificate)或身份证书(identity certificate)。
通常CA是一家第三方权威机构。负责管理和签发证书。整个实现原理也是非对称加密算法:
机构将自己的公钥以及身份信息交给CA机构(安全的),CA使用自己的私钥对各机构的公钥进行加密。这个过程称为验签。输出的加密后的公钥及身份信息称为数字证书。
当其他机构请求A机构公钥的时候,返回的是A机构的数字证书。其他机构可以使用CA的公钥对该数字证书中加密公钥进行解密获取A机构的通信公钥。
那么新得安全问题又来了,如何保证CA机构的公钥不被伪造?通常CA的公钥是集成在浏览器或者操作系统中,并且被很好的保护起来。
当然CA证书还涉及更多的安全细节设计(Hash算法防篡改、信任链等大量细节),这里只是简单的介绍。详细介绍可以查看:维基(证书颁发机构)
对于企业内部的应用系统就没必要花钱购买CA机构的证书服务了,可以自建 Root CA,自己给自己颁发证书,充当内网的CA机构。当然这时候客户端就需要导入CA的证书了(浏览器和操作系统没有自建的CA证书)。
1.1.3 SSL/TLS加密协议
SSL(Secure Sockets Layer)是一种安全协议,目的是为保障互联网上数据传输安全,利用数据加密技术,确保数据在网络上之传输过程中不会被截取。
从网络协议层看,SSL协议位于TCP/IP协议与应用层协议之间,为数据通讯提供安全支持。SSL协议自身可分为两层:
SSL记录协议(SSL Record Protocol):它建立在可靠的传输协议(如TCP)之上,为高层协议提供数据封装、压缩、加密等基本功能的支持。
SSL握手协议(SSL Handshake Protocol):它建立在SSL记录协议之上,用于在实际的数据传输开始前,通讯双方进行身份认证、协商加密算法、交换加密密钥等。例如HTTPS就是在HTTP应用层上增加了SSL加密协议支持(HTTP over SSL)。
TLS(Transport Layer Security,传输层安全协议),同样用于两个应用程序之间提供保密性和数据完整性。TLS 1.0建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本,可以理解为SSL 3.1,即是SSL的升级版。TLS的主要目标是使SSL更安全,并使协议的规范更精确和完善。另外,TLS版本号也与SSL的不同(TLS的版本1.0使用的版本号为SSLv3.1)
SSL通过握手过程在client和server之间协商会话參数,并建立会话。一共有三种方式:
仅仅验证server的SSL握手过程(单向SSL)
验证server和client的SSL握手过程(双向SSL)
恢复原有会话的SSL握手过程
第一种:单向SSL通信过程如下(SSL 客户端和SSL 服务端通信):
(1)SSL客户端向SSL服务端发起请求,请求信息包括SSL版本号、加密算法、密钥交换算法、MAC算法等信息;
(2)SSL服务端确定本次通话的SSL版本和加密套件后,将携带公钥信息的证书回给客户端。如果通话可从重用,还会返回会话ID;
(3)SSL服务端发送Server Hello Done消息。通知SSL客户端版本号和加密套件协商结束,开始进行密钥交换;
(4)SSL客户端对CA证书进行验证,证书合法则继续、不成功弹出选择页面;
(5)SSL客户端生产随机私有对称密钥key,并使用服务端公开密钥进行加密后,发给服务端;
(6)SSL服务端使用自己的私钥解密,获取对称密钥key;
(7)最后SSL客户端与SSL服务端将使用该对称密钥key进行加密通信。
第二种:单向认证,仅仅是客户端需要检验服务端证书是否是正确的。双向SSL和单向认证几乎一样,只是在客户端认证完服务器证书后,客户端会将自己的证书传给服务器。服务器验证通过后,才开始秘钥协商。
第三种:协商会话参数、建立会话的过程中,需要使用非对称密钥算法来加密密钥、验证通信对端的身份,计算量较大,占用了大量的系统资源。为了简化SSL握手过程,SSL允许重用已经协商过的会话。即可以重用会话ID。这就是第三种建立会话方式。
1.1.4 Openssl工具
对于企业内部(内部局域网)的应用系统通讯,如果需要CA证书服务,可以使用Openssl自建CA,并完成证书签发。
先说一下常用密钥类文件的规范:
后缀名规范
通常约定后缀含义:crt或者cert 表示证书, key表示私钥, req和csr表示请求文件。
文件格式
pem表示pem格式(经过加密的文本文件),der表示der格式(经过加密的二进制文件)。所有证书和私钥可以是pem,也可以是der格式,取决于需要。两个格式可以转换。
Openssl的配置文件(openssl.cnf
)定义CA的默认参数,例如ubuntu
系统中配置文件位置在/usr/lib/ssl/openssl.cnf
。如果不适用默认参数需要在命令中重新指定。
CA证书的制作
首先生成CA的私钥,使用下面的命令:
$ openssl genrsa -out private/ca.key.pem 2048private/ca.key.pem
是CA私钥,格式为pem,长度(加密位数)为2048。前面密码学知识知道CA使用一对密钥的(私钥和公钥),并且两个密钥是数学相关的。公钥可以通过私钥算出来。
CA证书自签发
参考命令如下:
$ openssl req -new -x509 -key private/ca.key.pem -out certs/ca.cert.pemcerts/ca.cert.pem
即CA的自签证书。部署导入到客户端(例如浏览器)。用户证书签发
用户证书的签发和CA自签相同,用户证书由CA私钥签发。用户需要提供请求文件。
$ openssl ca -in app.csr -out app.crt -days 365app.crt
为签发的证书。部署在应用服务器上。
1.1.5 Keytool工具介绍
在密钥证书管理时,通常使用JAVA的Keytool工具程序。Keytool 是一个JAVA数据证书的管理工具 ,Keytool 将密钥(key)和证书(certificates)存在一个称为keystore的文件中,通常称为密钥库文件。文件的扩展名通常使用:jks,全名java key store file。
Keytool是一个Java数据证书的管理工具,所以节点需要配置JAVA_HOME环境变量。
这里列举了命令支持的参数含义及注意点(供后续使用查阅):
keystore 参数指定保存证书的文件(密钥库二进制文件)。密钥库文件包含证书的私钥,必须对其进行安全保存。
validity 参数指定密钥有效期,单位是天。默认为90天。
keyalg 参数指定密钥使用的加密算法(例如RSA,如果不指定默认采用DSA)。
keysize 参数指定密钥的长度。该参数是选项参数,默认长度是1024位。为了保证密钥安全强度,建议密码长度设置为2048位。
keypass 参数指定生成密钥的密码(私钥密码)。
storepass 指定密钥库的密码(获取keystore信息所需的密码)。另外密钥库创建后,要对其做任何修改都必须提供该密码,以便访问密钥库。
alias 参数指定密钥别名。每个密钥文件有一个唯一的别名,别名不区分大小写。
dname 参数指定证书拥有者信息。例如:"CN=名字与姓氏,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码"。
list 参数显示密钥库中的证书信息。keytool -list -v -keystore 指定keystore -storepass 密码
v 参数显示密钥库中的证书详细信息。
export 将别名指定的证书导出到文件。keytool -export -alias 需要导出的别名 -keystore 指定keystore -file 指定导出的证书位置及证书名称 -storepass 密码。
file 参数指定导出到文件的文件名。
delete 删除密钥库中某条目。keytool -delete -alias 指定需删除的别名 -keystore 指定keystore -storepass 密码
printcert 查看导出的证书信息。keytool -printcert -file yushan.crt
keypasswd 修改密钥库中指定条目口令。keytool -keypasswd -alias 需修改的别名 -keypass 旧密码 -new 新密码 -storepass keystore密码 -keystore sage
storepasswd 修改keystore口令。keytool -storepasswd -keystore e:/yushan.keystore(需修改口令的keystore) -storepass 123456(原始密码) -new newpasswd(新密码)
import 将已签名数字证书导入密钥库。keytool -import -alias 指定导入条目的别名 -keystore 指定keystore -file 需导入的证书
关于Keytool工具的详细介绍,可以参考oracle的官网。
1.2 Kafka集群配置SSL加密
Apache Kafka允许客户端通过SSL连接。默认情况下,SSL是禁用的,可以根据需要打开。
1.2.1 集群环境准备
为了后文讲解方便,我们部署了Kafka集群(3节点)和Zookeeper集群(3节点)测试环境。其中zookeeper和kafka混合部署。
| 节点编号 | hostname | IP地址 |
|---|---|---|
| 1 | kafka.app.node1 | 192.168.1.5 |
| 2 | kafka.app.node2 | 192.168.1.6 |
| 3 | kafka.app.node3 | 192.168.1.7 |
Kafka集群节点对外服务端口为:9092;Zookeeper集群节点对外服务端口为:2181。
1.2.2 配置主机名验证
从Kafka 2.0.0版开始,默认会为客户端连接以及broker之间的连接启用服务器的主机名验证(SSL端点识别算法),以防止中间人攻击。可以通过设置参数ssl.endpoint.identification.algorithm
为空字符串来禁用服务器主机名验证。例如:
ssl.endpoint.identification.algorithm=另外高版本支持不停集群服务下,进行动态配置,使用脚本kafka-configs.sh
,参考命令如下:
bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="对于较旧的Kafka版本,ssl.endpoint.identification.algorithm
默认情况下未定义,因此不会启用主机名验证。若该属性设置HTTPS
,则启用主机名验证,例如:
ssl.endpoint.identification.algorithm=HTTPS需要注意的是,一旦启用主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
通用名称(CN,Common Name)
主题备用名称(SAN,Subject Alternative Name)
两个字段都有效,但RFC-2818建议使用SAN。SAN也更灵活,允许声明多个DNS条目。另一个优点是,CN可以设置为更有意义的值用于授权。如要添加SAN字段,需要将以下参数-ext SAN = DNS:{FQDN}
添加到keytool
命令中,例如:
$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}更通俗一点讲,SSL 握手期间验证主机名时,它会检查服务器证书是否具有 SAN 集。如果检测到 SAN 集,那么只使用 SAN 集中的名称或 IP 地址。如果未检测到 SAN 集,那么只使用主题专有名称 (DN) 最重要的属性,通常是通用名称(CN)。将该值与客户端尝试连接的服务器启的主机名进行比较。如果它们相同,主机名验证成功,允许建立连接。
1.2.3 生成SSL密钥和证书
为了方便管理证书密钥,我们使用统一的路径保存。例如统一放在/usr/ca
作为文件目录。
$ mkdir -p usr/ca/{root,server,client,trust}这里各文件夹的功能是:root:存储CA私钥和证书;server:存储服务端的私钥和证书;client:存储客户端私钥和证书;trust:存储信任库文件;
节点1(kafka.app.node1)
$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node1 其中dname
参数的含义参考Keytool
工具介绍,文件名为:server.keystore.jks
,这是密钥库。
节点2(kafka.app.node2)
$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node2节点3(kafka.app.node3)
$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node3证书生成后可以通过下面的命令进行查询(需要输入密钥库管理密码,即keypass
的参数):
$ keytool -list -v -keystore server.keystore.jks1.2.4 创建Kafka集群CA证书
集群中每个服务节点都有一对公钥和私钥,以及用于标识该节点的证书。但这个证书是未签名的,存在中间者攻击的风险。所以需要证书颁发机构(CA)负责签署颁发证书,使用openssl
工具实现。
同一个集群的所有节点共用一个CA证书,所以只需要在集群的一个节点(集群外节点均可)生成CA证书,然后分发给集群其他节点。例如在kafka.app.node1
节点上创建CA证书,命令如下:
$ openssl req -new -x509 -keyout usr/ca/root/ca.key.pem -out usr/ca/root/ca.cert.pem -days 365 -passout pass:app123 -subj "/C=cn/ST=shanghai/L=shanghai/O=org/OU=depart/CN=kafka.app.node1"然后使用scp
命令分发给其他节点:
$ scp usr/ca/root/* root@kafka.app.node2:/usr/ca/root/
$ scp usr/ca/root/* root@kafka.app.node3:/usr/ca/root/
生成两个文件,分别是私钥(ca.key.pem)和证书(ca.cert.pem),它用来签署其他证书。
1.2.5 集群服务节点签署证书
首先给集群各服务节点签发证书(即签名)。步骤如下:
第一步 从密钥容器中提取和导出服务端证书(输出文件:server.cert-file,未签名)
$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.itdw -certreq -file usr/ca/server/server.cert-file -storepass app123第二步 给服务端证书签名(输出文件:server.cert-signed,已签名)
$ openssl x509 -req -CA usr/ca/root/ca.cert.pem -CAkey usr/ca/root/ca.key.pem -in usr/ca/server/server.cert-file -out usr/ca/server/server.cert-signed -days 365 -CAcreateserial -passin pass:app123第三步 将CA证书导入服务端密钥容器中
$ keytool -keystore usr/ca/server/server.keystore.jks -alias CARoot -import -file usr/ca/root/ca.cert.pem -storepass app123第四步 将已签名的证书导入密钥容器中
$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -import -file usr/ca/server/server.cert-signed -storepass app123需要注意集群上每个服务节点均需要签署。
1.2.6 生成服务端信任库
如果kafka集群中配置中的参数ssl.client.auth
设置为:requested
或required
,需要为集群节点提供一个信任库,这个库中需要包含所有CA证书。
使用下面的命令将CA证书导入服务端信任库,输出为信任库文件:server.truststore.jks
$ keytool -keystore usr/ca/trust/server.truststore.jks -alias CARoot -import -file usr/ca/root/ca.cert.pem -storepass app123将CA证书导入服务端信任库,意味着信任该CA证书签名的所有证书。此属性称为信任链,在大型Kafka群集上部署SSL时特别有用。您可以使用单个CA对群集中的所有证书进行签名,并使所有计算机共享信任该CA的同一信任库。这样,所有计算机都可以对所有其他计算机进行身份验证。
1.2.7 配置Kafka Brokers
Kafka Broker节点支持侦听多个端口上的连接。在server.properties中配置,多个端口类型使用逗号分隔,我们以集群中kafka.app.node1
为例:
listeners=SSL://kafka.app.node1:9092代理端需要以下SSL配置
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123
其他可选配置设置:
ssl.client.auth
(可选)参数控制SSL认证模式。默认参数值为
requested
,默认使用单向认证,即客户端认证Kafka brokers。此时,没有证书的客户端仍然可以连接集群。参数值为required
,指定开启双向验证(2-way authentication)。Kafka服务器同时会验证客户端证书。生成集群建议开始双向认证。ssl.cipher.suites
(可选)密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。(默认为空列表)
ssl.enabled.protocols建议参数值为
TLSv1.2,TLSv1.1,TLSv1
。列出支持的SSL协议。生成环境不建议使用SSL,建议使用TLS。ssl.keystore.type
和`ssl.truststore.type文件格式:
JKSsecurity.inter.broker.protocol
参数kafka集群节点(brokers)之间启用
SSL
通讯,需要配置该配置参数为:SSL
。
最后我们总结合并一下所有的配置参数:
listeners=SSL://kafka.app.node1:9092
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
security.inter.broker.protocol=SSL
1.2.8 初步验证
正常启动集群的Zookeeper集群,然后依次启动集群的所有节点。使用下面的命令检查:
$ openssl s_client -debug -connect kafka.app.node1:9092 -tls1
该命令检查服务器的密钥库和信任库是否正确设置。命令中tls1
必须是集群配置参数ssl.enabled.protocols
所支持的协议。
Certificate chain
(省略)
---
Server certificate
-----BEGIN CERTIFICATE-----
(省略)
-----END CERTIFICATE-----
subject=(省略)
issuer=(省略)
---
No client certificate CA names sent
---
SSL handshake has read 2029 bytes and written 264 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-DES-CBC3-SHA
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
SSL-Session:
Protocol : TLSv1
Cipher : ECDHE-RSA-DES-CBC3-SHA
Session-ID: 5E580D610AEB5DDD8BCD0D31E88180F45391109792CA3CDD1E861EB87C704261
Session-ID-ctx:
Master-Key: E544FF34B993B2C3B7F7CB28D8166213F8D3A9864A82247F6948E33B319CD1A8943127DDF9B528EA73435EBC73B0DD55
Key-Arg : None
Start Time: 1582828897
Timeout : 7200 (sec)
Verify return code: 7 (certificate signature failure)
---
(省略)
如果证书未显示或有其他错误消息,则说明设置不正确。
另外对于'OpenSSL 0.9.8j-fips 07 Jan 2009'版本的openssl版本,由于这个版本不能自己检测出ssl的版本。会报下面的错误信息。
1816:error:1408E0F4:SSL routines:SSL3_GET_MESSAGE:unexpected message:s3_both.c:463:
1.3 配置kafka客户端
kafka集群需要支持集群内外的客户端交互访问。安全集群的客户端同样需要进行相关安全配置。这里客户端指的是Console客户端。
1.3.1 签发客户端证书
类似集群内部服务端的证书签发步骤,客户端证书签发过程入下:
生成客户端SSL密钥和证书,输出密钥容器:
client.keystore.jks$ keytool -keystore usr/ca/client/client.keystore.jks -alias kafka.app
.node1 -validity 365 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=dccsh,O=icbc,L=shanghai,S=shanghai,C=cn" -ext SAN=DNS:kafka.app.node1 -storepass app123从密钥容器中提取和导出客户端证书(输出文件:client.cert-file,未签名)
$ keytool -keystore usr/ca/client/client.keystore.jks -alias kafka.app.node1 -certreq -file usr/ca/client/client.cert-file -storepass app123
给客户端证书签名(输出文件:client.cert-signed,已签名)
$ openssl x509 -req -CA usr/ca/root/ca.cert.pem -CAkey usr/ca/root/ca.key.pem -in usr/ca/client/client.cert-file -out usr/ca/client/client.cert-signed -days 365 -CAcreateserial -passin pass:app123
将CA证书导入客户端密钥容器中
$ keytool -keystore usr/ca/client/client.keystore.jks -alias CARoot -import -file usr/ca/root/client.cert-file -storepass app123
将已签名的证书导入密钥容器中
$ keytool -keystore usr/ca/client/client.keystore.jks -alias kafka.app.node1 -import -file usr/ca/client/client.cert-signed -storepass app123
1.3.2 生成客户端信任库
使用下面的命令将CA证书导入客户端信任库,输出为信任库文件:client.truststore.jks
$ keytool -keystore usr/ca/trust/client.truststore.jks -alias CARoot -import -file usr/ca/root/ca.cert.pem -storepass app123
1.3.3 配置客户端
客户端的console-producer和console-consumer命令需要添加相关安全配置。
如果kafka集群不需要客户端身份验证,只需配置下面的配置:
security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123
如果需要客户端身份验证,还需要补充下面的配置信息:
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
根据我们的要求和代理配置,可能还需要其他配置设置:
ssl.provider(可选)。用于SSL连接的安全提供程序的名称。
ssl.cipher.suites(可选)。密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1。它应列出在代理方配置的至少一种协议
ssl.truststore.type = JKS
ssl.keystore.type = JKS
最后我们总结合并一下所有的配置参数(编辑文件名为:client-ssl.properties
):
security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
1.3.4 消费者生产者
使用console-producer的命令:
kafka-console-producer.sh --broker-list kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --producer.config client-ssl.properties
使用console-consumer的命令:
kafka-console-consumer.sh --bootstrap-server kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --new-consumer --consumer.config client-ssl.properties
这里test
为topic
名称,在只有SSL通信加密集群中,topic的创建、删除、生产、消费并没有权限管理,依然存在安全问题。所以kafka集群需要进一步配置权限管理。
第二部分 Kafka集群权限认证
Kafka集群的权限认证管理主要涉及:
身份认证(Authentication)。对客户端与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。
权限控制(Authorization)。实现对于消息级别的权限控制,客户端的读写操作进行Authorization(生产、消费)管理。
通俗的讲,身份认证解决的是证明你是谁,而权限控制解决的是你能干什么。在Kafka中身份认证和权限控制是两套独立的安全配置。
2.1 集群权限认证策略
Kafka从0.9.0.0版本后开始支持下面的SASL安全策略管理。这些安全功能为Kafka通信安全、多租户管理、集群云化提供了安全保障。截止目前Kafka 2.3版本,一共支持5种SASL方式。
| 验证方式 | 版本 | 说明 |
|---|---|---|
| SASL/PLAIN | 0.10.0.0 | 不能动态增加用户 |
| SASL/SCRAM | 0.10.2.0 | 可以动态增加用户。有两种方式:SASL/SCRAM-SHA-256 和SASL/SCRAM-SHA-512 |
| SASL/GSSAPI | 0.9.0.0 | 需要独立部署验证服务(即Kerberos服务) |
| SASL/OAUTHBEARER | 2.0.0 | 需自己实现接口实现token的创建和验证,需要额外Oauth服务 |
| SASL/Delegation Token | 1.1.0 | 补充现有 SASL 机制的轻量级认证机制 |
对于生产环境,SASL/PLAIN方式有个缺点:只能在JAAS文件KafkaServer参数中配置用户,集群运行期间无法动态新增用户(需要重启重新加载JAAS文件),这对维护管理带来不便。而SASL/SCRAM方式,将认证数据存储在Zookeeper中,可以动态新增用户并分配权限。
SASL/GSSAPI方式需要依赖Kerberos服务。对于一些已经部署了集中式的Kerberos服务的大厂,只需要申请一个principal即可。如果生产Kerberos认证中出现TGT分发性能瓶颈,可以使用SASL/Delegation Token模式。使用 Kafka 提供的 API 去获取对应的 Delegation Token。Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证),减少性能压力。
同样SASL/OAUTHBEARER方式需要Oauth服务。
各种方式引入版本不同,使用依赖各有差异,需要结合自身业务特点选择合适的架构方式。
2.2 SASL/SCRAM策略配置介绍
SASL/SCRAM方式将身份认证和权限控制的凭证(credential)数据均存储在Zookeeper中,需要对Zookeeper进行安全配置。
2.2.1 Zookeeper集群侧配置
对Zookeeper集群中所有节点更新下面的策略后,重启集群生效。
配置
zoo.cfg
文件文件尾部追加下面的配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000新增
zk_server_jaas.conf
文件配置文件内容如下:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret;
};其中
username
和password
定义的用户和密钥,用于Zookeeper与Kafka集群进行认证。配置项user_admin="admin-secret"
中 admin为用户名,admin-secret为密码,用于Zookeeper集群外客户端和集群内进行认证。拷贝依赖包
将kafka文件系统中
kafka/libs
目录下的jar包拷贝到zookeeper/lib
目录。kafka-clients-2.1.1.jar
lz4-java-1.5.0.jar
osgi-resource-locator-1.0.1.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.2.jar若没有引入依赖包,启动时会报找不到org.apache.kafka.common.security.plain.PlainLoginModule包的错误。
修改zookeeper启动参数
修改
bin/zkEnv.sh
文件, 在文件尾追加下面的配置内容。该配置完成引入的包的加载。变量CLASSPATH
和SERVER_JVMFLAGS
都会在Zookeeper启动时传给JVM
虚拟机。下面的配置中
$ZOOKEEPER_HOME
是zookeeper的环境变量,如果没有配置,使用绝对路径即可。for i in $ZOOKEEPER_HOME/lib/*.jar; do
CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOKEEPER_HOME/conf/zk_server_jaas.conf"
2.2.2 kafka集群侧配置
kafka集群中每一台节点均需要更新下面的配置。
新增
kafka_server_scram_jaas.conf
文件(在config
目录中)KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
# 自定义用户:
# user_admin="admin-secret"
# user_alice="alice-secret"
# user_reader="reader-secret"
# user_writer="writer-secret";
};
其中配置username
和password
为Kafka集群之间通讯的SCRAM凭证,用户名为admin
,密码为admin-secret
。
配置中类似user_XXX
格式的配置项为自定义用户。如果是SASL/PLAIN方式,用户只能在该文件中定义,不能动态新增。我们使用SASL/SCRAM方式,可以后续动态声明admin用户,不再此处进行配置。
更新Kafka的配置文件
server.properties
(在config
目录中):#SASL CONFIG
listeners=SASL_SSL://kafka.app.node1:9092
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#allow.everyone.if.no.acl.found=true
super.users=User:admin
#SSL CINFIG
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
security.inter.broker.protocol=SASL_SSL需要注意参数
allow.everyone.if.no.acl.found
,如果开启参数开关,当客户端和集群交互时候未找到ACL策略时,允许所有类型的访问操作。建议该参数关闭(false)。参数
security.inter.broker.protocol
指定集群brokers之间的通讯协议。不加密协议有:SASL_SSL、SASL_PLAINTEXT、PLAINTEXT;加密协议有:SSL。为了提高节点之间的交互性能,内部网络环境建议使用非加密协议。这里使用加密的SASL_SSL
协议。参数
super.users
指定了集群的超级用户为:admin
。注意如果指定多个超级用户,每个用户使用分号隔开,例如:super.users=User:admin;User:alice参数
sasl.enabled.mechanisms
列出支持的认证方式。即可以支持多种。参数
sasl.mechanism.inter.broker.protocol
指定集群内部的认证方式。Kafka仅支持最小迭代次数为4096的强哈希函数SHA-256和SHA-512。所以有SCRAM-SHA-512和SCRAM-SHA-256两种方式。配置kafka启动环境变量(
bin
目录下面的kafka-run-class.sh
)为 Kafka 添加 java.security.auth.login.config 环境变量(配置文件路径)。并且在启动模式中添加
KAFKA_SASL_OPTS
。# 截取配置文件片段:
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/software/kafka/config/kafka_server_scram_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi
2.2.3 SCRAM认证管理
在集群的配置文件kafka_server_scram_jaas.conf
中,定义了集群内部的认证用户。对于客户端和集群之间认证可以使用kafka-configs.sh
来动态创建。
创建用户SCRAM凭证
例如集群中的超级用户
admin
用户,使用下面的命令创建:
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
创建自定义普通用户alice
。
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
查看SCARM凭证
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --describe --entity-type users --entity-name admin
删除SCRAM凭证
$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice2.3 Kafka客户端配置
Kafka集群配置了认证,那么对于Console客户端访问集群自然需要配置认证信息。可集群节点内部通讯凭证的认知,同样需要定义JAAS文件。加入我们自定义了用户alice
,JAAS文件名为:kafka_console_client_jaas.conf
,配置内容如下:
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};
然后更新kafka-console-producer.sh
脚本和kafka-console-consumer.sh
脚本的启动参数。
# 文件截取更新部分:
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/software/kafka/config/kafka_write_jaas.conf"
fi
在配置SSL时候,我们新建了client-ssl.properties
配置文件,作为Console客户端启动配置。在集群启用SASL_SSL
后,我们同步更新如下:
security.protocol=SASL_SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
至此Console客户端已经配置完毕,但目前Console客户端还不能通过命令方式和集群进行交互,因为我们指定的用户对于集群的资源还没有任何权限。需要对用户进行集群资源的ACL控制设置,赋予相关权限。
2.4 ACL控制
Kafka权限资源包含Topic、Group、Cluster、TransactionalId(事务id),每个资源涉及的权限内容如下:
| 资源类型 | 权限类型 |
|---|---|
| Topic | Read,Write,Describe,Delete,DescribeConfigs,AlterConfigs,All |
| Group | Read,Describe,All |
| Cluster | Create,ClusterAction,DescribeConfigs,AlterConfigs,IdempotentWrite,Alter,Describe,All |
| TransactionalId | Describe,Write,All |
对于常用类型进行说明:
| 权限 | 说明 |
|---|---|
| Read | 读取topic、group信息 |
| Write | 写topic、TransactionalId(存储在内部topic) |
| Delete | 删除topic |
| Create | 创建topic |
| ALTER | 修改topic |
| Describe | 获取topic、group、TransactionalId信息 |
| ALL | 所有权限 |
Kafka提供ACL管理脚本:kafka-acls.sh
。
2.4.1 更新脚本配置
认证数据均存储在Zookeeper集群中,需要和Zookeeper交互自然需要配置相关认证信息。
首先需要新建JAAS文件,文件名为:zk_client_jaas.conf
。这里的用户已经在Zookeeper集群中进行定义。
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
最后更新kafka-acls.sh
脚本:
# 截取更新部分
if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/software/kafka/config/zk_client_jaas.conf"
fi
当然Kafka集群的配置文件中已经开启了ACL:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer至此完成配置。
2.4.2 ACL配置
根据官网的介绍,ACL的格式如下:
“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”
参数含义描述如下:
principal:指定一个Kafka user;
operation:指定一个具体的操作类型,例如:Read, Write, Delete等;
Host:表示与集群交互的客户端IP地址,如果是通配符‘*’表示所有IP。目前不支持主机名(hostname)形式,只能是IP地址;
Resource:指定一种Kafka资源类型(共有4种类型);
例如下面的ACL命令:
$ sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --add --allow-principal User:alice --allow-host '*' --operation ALL --topic test赋权之后,用户alice对test具有全部权限,并且访问请求可以是来自任何IP的客户端。
常用参数的补充说明:
对主机IP的限制参数,
allow-host
指定允许的IP,deny-host
指定禁用IP;新增和删除一个赋权策略,分别使用:
add
和remove
2.4.3 ACL策略查看
使用参数list
参看ACL策略。例如:
$ sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --list --topic test-topic该查看命令显示test-topic
资源相关的所有ACL策略。
2.4.4 超级用户
kafka集群的配置文件server.properties
中定义了超级用户(Super Users),超级用户不在ACL控制范围内,默认可以访问集群中所有资源,并具备所有权限。
2.5 权限认证数据访问
集群的认证数据存储在Zookeeper,可以通过Zookeeper的console客户端访问认证数据。
使用zookeeper自带的命令行客户端:
/dmqs/zookeeper/bin> ./zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled
[zk: localhost:2181(CONNECTING) 0]
查看zookeeper中的数据:
[zk: localhost:2181(CONNECTED) 1] ls
[cluster, controller_epoch, controller, brokers, zookeeper, kafka-acl, kafka-acl-changes, admin, isr_change_notification, consumers, config]
其中kafka-acl
中存储相关权限认证数据。
[zk: localhost:2181(CONNECTED) 3] ls kafka-acl
[Cluster, Topic]
可以查看其中的权限信息。
2.6 SSL和SASL的说明
SSL是传输层安全协议,是位于传输层(TCP/IP)和应用层(HTTP)的协议,SSL是对整个传输过程的加密,SSL是对客户端和服务器之间传输的所有数据进行加密。假如在配置的时候使用了SASL,但是没有使用SSL,那么除了账号密码外,所有的传输内容都是裸奔的。
所以生产集群采用SSL和SASL结合方式,即SSL_SASL方式。
第三部分 安全集群的客户端
3.1 开发语言类
3.1.1 Python客户端
目前市面上kafka的python API常用的有三种:
第一种 kafka
该项目是kafka-python
的老项目,2017年后调整为kafka-python
项目。
第二种 kafka-python
最新版本为2.0,首先从客户端的密钥库中导出CA证书。
$ keytool -exportcert -alias CARoot -keystore client.keystore.jks -rfc -file ca.cert.pem生产者和消费者的案例如下:
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
import time
#logging.basicConfig(level=logging.DEBUG)
try:
bootstrap_servers = 'kafka.itdw.node1:9092,kafka.itdw.node2:9092,kafka.itdw.node3:9092'
topic = "test"
sasl_mechanism = "SCRAM-SHA-512"
username = "alice"
password = "alice-secret"
security_protocol = "SASL_SSL"
# CA 证书路径
ssl_cafile = 'ca.cert.pem'
# SSL
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
context.load_verify_locations(ssl_cafile)
# 消费者
consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers,
api_version=(0, 10),
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism = sasl_mechanism,
sasl_plain_username = username,
sasl_plain_password = password
)
# 生产者
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
api_version=(0, 10),
acks='all',
retries=1,
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism=sasl_mechanism,
sasl_plain_username=username,
sasl_plain_password=password
)
# 生产数据
for i in range(10):
producer.send(topic, bytes("测试",encoding='utf8'))
producer.flush()
# 消费数据
for msg in consumer:
print(msg)
except Exception as e:
print(e)
需要的注意的事项有:
Kafka集群启用主机名模式,所以应用程序运行节点的hosts文件需要配置Kafka集群节点的域名映射。
ssl_context参数为包装套接字连接的预配置SSLContext。如果非None,将忽略所有其他ssl_ *配置。
主机名验证问题。如果证书中域名和主机名不匹配,客户端侧需要配置需要调整如下:
ssl_ctx.check_hostname=False
ssl_ctx.verify_mode = CERT_NONE如果不提前预配置SSLContext,还需要客户端的证书。
$ keytool -exportcert -alias localhost -keystore client.keystore.jks -rfc -file client.cert.pem生产者的参数需要添加:
ssl_certfile = "client.cert.pem"
ssl_cafile = "ca.cert.pem"
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
api_version=(0, 10),
acks='all',
retries=1,
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism=sasl_mechanism,
sasl_plain_username=username,
sasl_plain_password=password,
ssl_check_hostname=False,
ssl_certfile=ssl_certfile,
ssl_cafile=ssl_cafile)第三种 confluent-kafka
confluent-kafka包由confluent公司开源,主要是对C/C++客户端包(librdkafka)的封装。案例代码如下:
# -*- coding: utf-8 -*-
from confluent_kafka import Producer
# 回调函数
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print(’Message delivery failed: {}‘.format(err))
else:
print(‘Message delivered to {} [{}]‘.format(msg.topic(), msg.partition()))
if __name__ == ‘__main__‘:
producerConfing = {"bootstrap.servers": 'kafka.itdw.node1:9092,kafka.itdw.node2:9092,kafka.itdw.node3:9092',
"security.protocol": 'SASL_SSL',
"sasl.mechanisms": 'SCRAM-SHA-256',
"sasl.username": 'alice',
"sasl.password": 'alice-secret',
"ssl.ca.location": 'ca.cert.pem'
}
ProducerTest = Producer(producerConfing)
ProducerTest.poll(0)
ProducerTest.produce(‘testTopic‘, ‘confluent kafka test‘.encode(‘utf-8‘),callback=delivery_report)
ProducerTest.flush()
3.1.2 Go客户端
我们的Go语言中常用的Kafka的客户端包有:
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/segmentio/ksuid"
其中最常用的是sarama
,案例参考github项目。
3.1.3 Java客户端
生产者:
package com.kafka.security;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
import java.util.Random;
public class KafkaProducerWithSASL_SSL {
private static final String KAFKA_TOPIC = "topsec";
private static final String BOOTSTRAP_SERVER = "docker31:9092";
private static final String[] strs = new String[]{"zhao", "qian", "sun", "li", "zhou", "wu", "zheng", "wang", "feng", "chen"};
private static final Random r = new Random();
public static void main(String[] args) {
try {
producer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void producer() throws InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
//SASL_SSL加密
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\trust\\client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "hadoop");
// SSL用户认证
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\client\\client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "hadoop");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hadoop");
//SASL用户认证
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
Producer<String, String> producer = new KafkaProducer<>(props);
while (true) {
producer.send(new ProducerRecord<>(KAFKA_TOPIC, strs[r.nextInt(10)],strs[r.nextInt(10)]));
Thread.sleep(2000);
}
}
}
消费者:
package com.topsec.kafka.security;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerWithSASLAndSSL {
private static final String KAFKA_TOPIC = "topsec";
private static final String BOOTSTRAP_SERVER = "docker31:9092";
public static void main(String[] args) {
consumer();
}
private static void consumer() {
Properties props = new Properties();
//SASL_SSL加密配置
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\trust\\client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "hadoop");
//SSL身份验证配置
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\client\\client.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "hadoop");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hadoop");
//SASL身份验证
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
record.offset(),
record.key(),
record.value(),
record.partition());
}
}
}
}
3.2 组件类
3.2.1 Console客户端
客户端节点部署kafka项目,在bin目录下面我们已经更新了kafka-console-consumer.sh
和kafka-console-producer.sh
两个脚本。并分别新增了加密访问的配置文件consumer.config
和producer.config
。
命令案例参考:1.3.4 章节内容。
3.2.2 Flume客户端
目前Flume项目官网项目文档介绍支持下面三种方式:
SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证;
SASL_SSL - 有数据加密的 Kerberos 或明文认证;
SSL - 基于TLS的加密,可选的身份验证;
事实上对于SASL/SCRAM方式Flume也是支持的。具体配置如下(以Flume1.9版本为例):
3.2.2.1 第一步 新增jaas配置文件
在Flume的conf配置目录下面新增flume_jaas.conf文件,文件内容:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
3.2.2.2 第二步 更新flume-env.sh文件
Flume的conf配置目录下面flume-env.sh
文件添加JAVA_OPTS
配置更新:
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/dmqs/apache-flume-1.9.0-bin/conf/flume_jaas.conf"其中路径为第一步中新增的flume_jaas.conf
文件路径。
3.2.2.3 测试案例(sinks)
我们使用一个简单的案例来测试,Flume的source为监控文件尾写入,Flume的sinks为加密kafka集群。具体配置如下:
#define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f dmqs/apache-flume-1.9.0-bin/data/flume/flume.log
a1.sources.r1.shell = bin/bash -c
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 15000
a1.channels.c1.transactionCapacity = 15000
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = kafka.itdw.node1:9093
a1.sinks.k1.kafka.topic = flume
a1.sinks.k1.kafka.flumeBatchSize = 15000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1000
a1.sinks.k1.kafka.producer.security.protocol=SASL_SSL
a1.sinks.c1.kafka.producer.sasl.mechanism =SCRAM-SHA-512
a1.sinks.c1.kafka.producer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"
####
a1.sinks.k1.kafka.producer.ssl.truststore.location =/usr/ca/trust/client.truststore.jks
a1.sinks.k1.kafka.producer.sasl.mechanism =SCRAM-SHA-512
a1.sinks.k1.kafka.producer.ssl.truststore.password=app123
a1.sinks.k1.kafka.producer.ssl.keystore.location=/usr/ca/client/client.keystore.jks
a1.sinks.k1.kafka.producer.ssl.keystore.password=app123
a1.sinks.k1.kafka.producer.ssl.key.password=app123
a1.sinks.k1.kafka.producer.timeout.ms = 100
a1.sinks.k1.batchSize=15000
a1.sinks.k1.batchDurationMillis=2000
配置保存为flume-sink-auth-kafka.conf
,为了检查输出结果使用下面命令启动(在bin目录中):
./flume-ng agent --conf ../conf --conf-file ../conf/flume-sink-auth-kafka.conf --name a1 -Dflume.root.logger=INFO,console向文件尾部追加信息:
echo "test" >> dmqs/apache-flume-1.9.0-bin/data/flume/flume.log然后使用消费者客户端查看数据是否写入kafka的flume主题中。
3.2.2.3 测试案例(source)
同样可以也可以将加密Kafka作为Flume的source,配置案例如下:
#define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = kafka.app .node1:9093,kafka.app.node2:9093,kafka.app.node3:9093
a1.sources.r1.kafka.topics = flume
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.kafka.consumer.timeout.ms = 2000
a1.sources.r1.batchSize=150
a1.sources.r1.batchDurationMillis=1000
#####
a1.sources.r1.kafka.consumer.ssl.truststore.location =/usr/ca/trust/client.truststore.jks
a1.sources.r1.kafka.consumer.sasl.mechanism =SCRAM-SHA-512
a1.sources.r1.kafka.consumer.ssl.truststore.password=itdw123
a1.sources.r1.kafka.consumer.ssl.keystore.location=/usr/ca/client/client.keystore.jks
a1.sources.r1.kafka.consumer.ssl.keystore.password=itdw123
a1.sources.r1.kafka.consumer.ssl.key.password=itdw123
a1.sources.r1.kafka.consumer.security.protocol=SASL_SSL
a1.sources.r1.kafka.consumer.sasl.mechanism =SCRAM-SHA-512
a1.sources.r1.kafka.consumer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 15000
a1.channels.c1.transactionCapacity = 15000
# sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /dmqs/apache-flume-1.9.0-bin/data/flume
a1.sinks.k1.sink.serializer = TEXT
案例中将加密Kafka中flume主题中的数据汇入到指定目录的文件中。
3.2.3 Logstash客户端
Logstash和Kafka交互使用Kafka output plugin
插件实现。其中配置文件中output部分如下:
output {
kafka {
id => "kafkaSLL_SASL"
codec => "json"
ssl_endpoint_identification_algorithm => ""
bootstrap_servers => "kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092"
ssl_keystore_location => "/etc/logstash/certificates/client.keystore.jks"
ssl_keystore_password => "app123"
ssl_keystore_type => "JKS"
ssl_truststore_location => "/etc/logstash/certificates/client.truststore.jks"
ssl_truststore_password => "app123"
ssl_truststore_type => "JKS"
sasl_mechanism => "SCRAM-SHA-512"
security_protocol => "SASL_SSL"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='alice' password='alice-secret';"
topic_id => "test"
}
}
第四部分 加密认证集群的性能压测
集群启用SSL后,数据交互中需要加密、解密。kafka集群的I/O性能会降低。我们使用Kafka自带的压侧工具对集群加密前和加密后性能进行评测。
4.1生产者压力测试
客户端写入参数配置为acks=all
(即主题中Leader和fellow副本均写入成功)。每条消息大小为1M(消息体小吞吐量会大一些)。另外测试客户端为集群内部节点,忽略了数据网络传输的性能消耗。
4.1.1不加密集群
./kafka-consumer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all测试结果:
1500000 records sent, 38538.615693 records/sec (37.64 MB/sec), 748.44 ms avg latency, 5485.00 ms max latency, 227 ms 50th, 3194 ms 95th, 3789 ms 99th, 3992 ms 99.9th.4.1.2加密集群
./kafka-producer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all --producer.config producer.config测试结果:
1500000 records sent, 16901.027582 records/sec (16.50 MB/sec), 1713.43 ms avg latency, 9345.00 ms max latency, 72 ms 50th, 1283 ms 95th, 2067 ms 99th, 2217 ms 99.9th.4.2 压侧结论
加密改造前,生产者的吞吐量为3.8w 条/秒,改造后1.7W 条/秒。整体吞吐性能降低50%左右,数据的加密、解密导致吞吐量性能降低。平均时延也增加了一倍多(改造前700ms,改造后1700ms)。在实际生产中可参考这个性能折扣基线配置集群资源。
参考文献及资料
1、Kafka官网对安全类功能介绍,链接:http://kafka.apache.org/documentation/#security
2、Kafka ACLs in Practice – User Authentication and Authorization,链接:https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/
3、维基百科(数字证书),链接:https://zh.wikipedia.org/wiki/公開金鑰認證
4、SSL技术白皮书,链接:https://blog.51cto.com/xuding/1732723
5、Kafka权限管理,链接:https://www.jianshu.com/p/09129c9f4c80
5、Flume文档,链接:https://flume.apache.org/FlumeUserGuide.html#kafka-sinkorg/FlumeUserGuide.html#kafka-sink




