暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka安全集群实践

数据科学和工程 2020-05-01
2023

背景

Kafka在0.9.0.0版本前没有安全机制功能。Kafka Client程序可以直接获取到Kafka集群元数据信息和Kafka Broker地址后,连接到Kafka集群,然后完全操作集群上的所有topic数据资源。另外集群节点间通讯、broker和zookeeper通讯、客户端和集群的网络层通信都是无加密模式。集群的数据存在极大的安全风险。

自0.9.0.0版本开始,Kafka社区逐步添加了较多功能用于提高Kafka群集的安全性。目前Kafka安全集群安全机制主要有三个方面的设置:通信加密(encryption)、身份认证(authentication)和授权(authorization)。

本文重点介绍生产安全集群的一种配置方案。数据通讯传输配置SSL,认证配置SASL,授权通过ACL接口命令来完成的,即:SSL+SASL/SCRAM+ACL

第一部分   Kafka集群加密传输

1.1 背景知识介绍

涉及的技术知识不做详细介绍。

1.1.1 密码学基础

加密算法分为两类:

  • 对称密钥算法(Symmetric Cryptography):数据加密和解密时使用相同的密钥。例如常用的DES就是对称加密算法。

  • 非对称密钥算法(Asymmetric Cryptography):数据加密和解密时使用不同的密钥,分为:公开的公钥(public key)和用户保存的私钥(private key),私钥和公钥在数学上是相关的。利用公钥(或私钥)加密的数据只能用相应的私钥(或公钥)才能解密。举一个例子:客户在银行网银上做一笔交易,首先向银行申请公钥,银行分发公钥给用户,用户使用公钥对请求数据进行加密。银行收到加密数据后通过银行侧保存的私钥进行解密处理,并处理后更新后台数据库。这个通讯过程中银行不需要通过互联网分发私钥。因此保证了私钥的安全。目前最常用的非对称加密算法是RSA算法。

    非对称密钥算法中,私钥来解密公钥加密的数据,公钥来解密私钥加密的数据。

两种加密算法的比较:

  • 对称密钥的强度和密钥长度成正比,但是解密效率和密钥长度成反比。另外私钥的分发存在安全风险。

  • 非对称加密保证了私钥的安全性,但是加密和解密的效率比对称加密低。

所以通常加密场景是两种密钥结合使用。使用数据主体使用对称秘钥算法,但是私钥的传输使用非对称算法在互联网环境分发非对称密钥。最常见的就是SSL/TLS。

1.1.2 CA数字证书

对于非对称密钥算法存在一个安全风险点,那就是公钥的分发存在中间人攻击。还是以客户和银行的通信为例(例子简单化处理)。客户和银行分别有自己的公钥和私钥,私钥各自保留本地。公钥通过互联网分发给对方。那么公钥就是有安全风险的。存在被黑客截取风险。客户向银行申请银行公钥,结果被黑客截取,黑客伪装成银行,返回给用户自己的黑客公钥,用户收到黑客公钥后,将信息加密发给黑客。黑客用黑客私钥进行解密,获取到真实信息。这时候黑客伪装成客户用相同的方法完成和银行的数据交互。这就是中间人攻击的案例。

所以非对称加密算法的公钥传输同样存在风险。当然如果使用原始的离线方式交换密钥是安全的,但是随着互联网通信的爆炸式增长,这是落后低效的。为了保证公钥的真实性和安全性,这时候我们引入第三个角色:公开密钥认证(Public key certificate,简称CA),又称数字证书(digital certificate)或身份证书(identity certificate)。

通常CA是一家第三方权威机构。负责管理和签发证书。整个实现原理也是非对称加密算法:

  • 机构将自己的公钥以及身份信息交给CA机构(安全的),CA使用自己的私钥对各机构的公钥进行加密。这个过程称为验签。输出的加密后的公钥及身份信息称为数字证书。

  • 当其他机构请求A机构公钥的时候,返回的是A机构的数字证书。其他机构可以使用CA的公钥对该数字证书中加密公钥进行解密获取A机构的通信公钥。

那么新得安全问题又来了,如何保证CA机构的公钥不被伪造?通常CA的公钥是集成在浏览器或者操作系统中,并且被很好的保护起来。

当然CA证书还涉及更多的安全细节设计(Hash算法防篡改、信任链等大量细节),这里只是简单的介绍。详细介绍可以查看:维基(证书颁发机构

对于企业内部的应用系统就没必要花钱购买CA机构的证书服务了,可以自建 Root CA,自己给自己颁发证书,充当内网的CA机构。当然这时候客户端就需要导入CA的证书了(浏览器和操作系统没有自建的CA证书)。

1.1.3 SSL/TLS加密协议

SSL(Secure Sockets Layer)是一种安全协议,目的是为保障互联网上数据传输安全,利用数据加密技术,确保数据在网络上之传输过程中不会被截取。

从网络协议层看,SSL协议位于TCP/IP协议与应用层协议之间,为数据通讯提供安全支持。SSL协议自身可分为两层:

  • SSL记录协议(SSL Record Protocol):它建立在可靠的传输协议(如TCP)之上,为高层协议提供数据封装、压缩、加密等基本功能的支持。

  • SSL握手协议(SSL Handshake Protocol):它建立在SSL记录协议之上,用于在实际的数据传输开始前,通讯双方进行身份认证、协商加密算法、交换加密密钥等。例如HTTPS就是在HTTP应用层上增加了SSL加密协议支持(HTTP over SSL)。

TLS(Transport Layer Security,传输层安全协议),同样用于两个应用程序之间提供保密性和数据完整性。TLS 1.0建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本,可以理解为SSL 3.1,即是SSL的升级版。TLS的主要目标是使SSL更安全,并使协议的规范更精确和完善。另外,TLS版本号也与SSL的不同(TLS的版本1.0使用的版本号为SSLv3.1)

SSL通过握手过程在client和server之间协商会话參数,并建立会话。一共有三种方式:

  • 仅仅验证server的SSL握手过程(单向SSL)

  • 验证server和client的SSL握手过程(双向SSL)

  • 恢复原有会话的SSL握手过程

第一种:单向SSL通信过程如下(SSL 客户端和SSL 服务端通信):

(1)SSL客户端向SSL服务端发起请求,请求信息包括SSL版本号、加密算法、密钥交换算法、MAC算法等信息;

(2)SSL服务端确定本次通话的SSL版本和加密套件后,将携带公钥信息的证书回给客户端。如果通话可从重用,还会返回会话ID;

(3)SSL服务端发送Server Hello Done消息。通知SSL客户端版本号和加密套件协商结束,开始进行密钥交换;

(4)SSL客户端对CA证书进行验证,证书合法则继续、不成功弹出选择页面;

(5)SSL客户端生产随机私有对称密钥key,并使用服务端公开密钥进行加密后,发给服务端;

(6)SSL服务端使用自己的私钥解密,获取对称密钥key;

(7)最后SSL客户端与SSL服务端将使用该对称密钥key进行加密通信。

第二种:单向认证,仅仅是客户端需要检验服务端证书是否是正确的。双向SSL和单向认证几乎一样,只是在客户端认证完服务器证书后,客户端会将自己的证书传给服务器。服务器验证通过后,才开始秘钥协商。

第三种:协商会话参数、建立会话的过程中,需要使用非对称密钥算法来加密密钥、验证通信对端的身份,计算量较大,占用了大量的系统资源。为了简化SSL握手过程,SSL允许重用已经协商过的会话。即可以重用会话ID。这就是第三种建立会话方式。

1.1.4 Openssl工具

对于企业内部(内部局域网)的应用系统通讯,如果需要CA证书服务,可以使用Openssl自建CA,并完成证书签发。

先说一下常用密钥类文件的规范:

  • 后缀名规范

    通常约定后缀含义:crt或者cert 表示证书, key表示私钥, req和csr表示请求文件。

  • 文件格式

    pem表示pem格式(经过加密的文本文件),der表示der格式(经过加密的二进制文件)。所有证书和私钥可以是pem,也可以是der格式,取决于需要。两个格式可以转换。

Openssl的配置文件(openssl.cnf
)定义CA的默认参数,例如ubuntu
系统中配置文件位置在/usr/lib/ssl/openssl.cnf
。如果不适用默认参数需要在命令中重新指定。

  • CA证书的制作

    首先生成CA的私钥,使用下面的命令:

    $ openssl genrsa -out private/ca.key.pem 2048

    private/ca.key.pem
    是CA私钥,格式为pem,长度(加密位数)为2048。

    前面密码学知识知道CA使用一对密钥的(私钥和公钥),并且两个密钥是数学相关的。公钥可以通过私钥算出来。

  • CA证书自签发

    参考命令如下:

    $ openssl req -new -x509 -key private/ca.key.pem -out certs/ca.cert.pem

    certs/ca.cert.pem
    即CA的自签证书。部署导入到客户端(例如浏览器)。

  • 用户证书签发

    用户证书的签发和CA自签相同,用户证书由CA私钥签发。用户需要提供请求文件。

    $ openssl ca -in app.csr -out app.crt -days 365

    app.crt
    为签发的证书。部署在应用服务器上。

1.1.5 Keytool工具介绍

在密钥证书管理时,通常使用JAVA的Keytool工具程序。Keytool 是一个JAVA数据证书的管理工具 ,Keytool 将密钥(key)和证书(certificates)存在一个称为keystore的文件中,通常称为密钥库文件。文件的扩展名通常使用:jks,全名java key store file。

Keytool是一个Java数据证书的管理工具,所以节点需要配置JAVA_HOME环境变量。

这里列举了命令支持的参数含义及注意点(供后续使用查阅):

  • keystore 参数指定保存证书的文件(密钥库二进制文件)。密钥库文件包含证书的私钥,必须对其进行安全保存。

  • validity 参数指定密钥有效期,单位是天。默认为90天。

  • keyalg 参数指定密钥使用的加密算法(例如RSA,如果不指定默认采用DSA)。

  • keysize 参数指定密钥的长度。该参数是选项参数,默认长度是1024位。为了保证密钥安全强度,建议密码长度设置为2048位。

  • keypass 参数指定生成密钥的密码(私钥密码)。

  • storepass 指定密钥库的密码(获取keystore信息所需的密码)。另外密钥库创建后,要对其做任何修改都必须提供该密码,以便访问密钥库。

  • alias 参数指定密钥别名。每个密钥文件有一个唯一的别名,别名不区分大小写。

  • dname 参数指定证书拥有者信息。例如:"CN=名字与姓氏,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码"。

  • list 参数显示密钥库中的证书信息。keytool -list -v -keystore 指定keystore -storepass 密码

  • v 参数显示密钥库中的证书详细信息。

  • export 将别名指定的证书导出到文件。keytool -export -alias 需要导出的别名 -keystore 指定keystore -file 指定导出的证书位置及证书名称 -storepass 密码。

  • file  参数指定导出到文件的文件名。

  • delete   删除密钥库中某条目。keytool -delete -alias 指定需删除的别名 -keystore 指定keystore -storepass 密码

  • printcert  查看导出的证书信息。keytool -printcert -file yushan.crt

  • keypasswd   修改密钥库中指定条目口令。keytool -keypasswd -alias 需修改的别名 -keypass 旧密码 -new 新密码 -storepass keystore密码 -keystore sage

  • storepasswd 修改keystore口令。keytool -storepasswd -keystore e:/yushan.keystore(需修改口令的keystore) -storepass 123456(原始密码) -new newpasswd(新密码)

  • import   将已签名数字证书导入密钥库。keytool -import -alias 指定导入条目的别名 -keystore 指定keystore -file 需导入的证书

关于Keytool工具的详细介绍,可以参考oracle的官网

1.2 Kafka集群配置SSL加密

Apache Kafka允许客户端通过SSL连接。默认情况下,SSL是禁用的,可以根据需要打开。

1.2.1 集群环境准备

为了后文讲解方便,我们部署了Kafka集群(3节点)和Zookeeper集群(3节点)测试环境。其中zookeeper和kafka混合部署。

节点编号hostnameIP地址
1kafka.app.node1192.168.1.5
2kafka.app.node2192.168.1.6
3kafka.app.node3192.168.1.7

Kafka集群节点对外服务端口为:9092;Zookeeper集群节点对外服务端口为:2181。

1.2.2 配置主机名验证

从Kafka 2.0.0版开始,默认会为客户端连接以及broker之间的连接启用服务器的主机名验证(SSL端点识别算法),以防止中间人攻击。可以通过设置参数ssl.endpoint.identification.algorithm
为空字符串来禁用服务器主机名验证。例如:

ssl.endpoint.identification.algorithm=

另外高版本支持不停集群服务下,进行动态配置,使用脚本kafka-configs.sh
,参考命令如下:

bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="

对于较旧的Kafka版本,ssl.endpoint.identification.algorithm
默认情况下未定义,因此不会启用主机名验证。若该属性设置HTTPS
,则启用主机名验证,例如:

ssl.endpoint.identification.algorithm=HTTPS

需要注意的是,一旦启用主机名验证,客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):

  • 通用名称(CN,Common Name)

  • 主题备用名称(SAN,Subject Alternative Name)

两个字段都有效,但RFC-2818建议使用SAN。SAN也更灵活,允许声明多个DNS条目。另一个优点是,CN可以设置为更有意义的值用于授权。如要添加SAN字段,需要将以下参数-ext SAN = DNS:{FQDN}
添加到keytool
命令中,例如:

$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SAN=DNS:{FQDN}

更通俗一点讲,SSL 握手期间验证主机名时,它会检查服务器证书是否具有 SAN 集。如果检测到 SAN 集,那么只使用 SAN 集中的名称或 IP 地址。如果未检测到 SAN 集,那么只使用主题专有名称 (DN) 最重要的属性,通常是通用名称(CN)。将该值与客户端尝试连接的服务器启的主机名进行比较。如果它们相同,主机名验证成功,允许建立连接。

1.2.3 生成SSL密钥和证书

为了方便管理证书密钥,我们使用统一的路径保存。例如统一放在/usr/ca
作为文件目录。

$ mkdir -p usr/ca/{root,server,client,trust}

这里各文件夹的功能是:root:存储CA私钥和证书;server:存储服务端的私钥和证书;client:存储客户端私钥和证书;trust:存储信任库文件;

  • 节点1(kafka.app.node1)

$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node1

 其中dname
参数的含义参考Keytool
工具介绍,文件名为:server.keystore.jks
,这是密钥库。

  • 节点2(kafka.app.node2)

$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node2
  • 节点3(kafka.app.node3)

$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -validity 3650 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=depart,O=org,L=shanghai,S=shanghai,C=cn" -storepass app123 -ext SAN=DNS:kafka.app.node3

证书生成后可以通过下面的命令进行查询(需要输入密钥库管理密码,即keypass
的参数):

$ keytool -list -v -keystore server.keystore.jks

1.2.4 创建Kafka集群CA证书

集群中每个服务节点都有一对公钥和私钥,以及用于标识该节点的证书。但这个证书是未签名的,存在中间者攻击的风险。所以需要证书颁发机构(CA)负责签署颁发证书,使用openssl
工具实现。

同一个集群的所有节点共用一个CA证书,所以只需要在集群的一个节点(集群外节点均可)生成CA证书,然后分发给集群其他节点。例如在kafka.app.node1
节点上创建CA证书,命令如下:

$ openssl req -new -x509 -keyout usr/ca/root/ca.key.pem -out usr/ca/root/ca.cert.pem -days 365 -passout pass:app123 -subj "/C=cn/ST=shanghai/L=shanghai/O=org/OU=depart/CN=kafka.app.node1"

然后使用scp
命令分发给其他节点:

$ scp usr/ca/root/* root@kafka.app.node2:/usr/ca/root/
$ scp usr/ca/root/* root@kafka.app.node3:/usr/ca/root/

生成两个文件,分别是私钥(ca.key.pem)和证书(ca.cert.pem),它用来签署其他证书。

1.2.5 集群服务节点签署证书

首先给集群各服务节点签发证书(即签名)。步骤如下:

  • 第一步 从密钥容器中提取和导出服务端证书(输出文件:server.cert-file,未签名)

$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.itdw -certreq -file usr/ca/server/server.cert-file -storepass app123
  • 第二步 给服务端证书签名(输出文件:server.cert-signed,已签名)

$ openssl x509 -req -CA usr/ca/root/ca.cert.pem -CAkey usr/ca/root/ca.key.pem -in usr/ca/server/server.cert-file -out usr/ca/server/server.cert-signed -days 365 -CAcreateserial -passin pass:app123
  • 第三步 将CA证书导入服务端密钥容器中

$ keytool -keystore usr/ca/server/server.keystore.jks -alias CARoot -import -file usr/ca/root/ca.cert.pem -storepass app123
  • 第四步 将已签名的证书导入密钥容器中

$ keytool -keystore usr/ca/server/server.keystore.jks -alias kafka.app -import -file usr/ca/server/server.cert-signed -storepass app123

需要注意集群上每个服务节点均需要签署。

1.2.6 生成服务端信任库

如果kafka集群中配置中的参数ssl.client.auth
设置为:requested
required
,需要为集群节点提供一个信任库,这个库中需要包含所有CA证书。

使用下面的命令将CA证书导入服务端信任库,输出为信任库文件:server.truststore.jks

$ keytool -keystore usr/ca/trust/server.truststore.jks -alias CARoot -import -file usr/ca/root/ca.cert.pem -storepass app123

将CA证书导入服务端信任库,意味着信任该CA证书签名的所有证书。此属性称为信任链,在大型Kafka群集上部署SSL时特别有用。您可以使用单个CA对群集中的所有证书进行签名,并使所有计算机共享信任该CA的同一信任库。这样,所有计算机都可以对所有其他计算机进行身份验证。

1.2.7 配置Kafka Brokers

Kafka Broker节点支持侦听多个端口上的连接。在server.properties中配置,多个端口类型使用逗号分隔,我们以集群中kafka.app.node1
为例:

listeners=SSL://kafka.app.node1:9092

代理端需要以下SSL配置

ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123

其他可选配置设置:

  • ssl.client.auth
    (可选)

    参数控制SSL认证模式。默认参数值为requested
    ,默认使用单向认证,即客户端认证Kafka brokers。此时,没有证书的客户端仍然可以连接集群。参数值为required
    ,指定开启双向验证(2-way authentication)。Kafka服务器同时会验证客户端证书。生成集群建议开始双向认证。

  • ssl.cipher.suites
    (可选)

    密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。(默认为空列表)

  • ssl.enabled.protocols

    建议参数值为TLSv1.2,TLSv1.1,TLSv1
    。列出支持的SSL协议。生成环境不建议使用SSL,建议使用TLS。

  • ssl.keystore.type
    `ssl.truststore.type

    文件格式:JKS

  • security.inter.broker.protocol
    参数

    kafka集群节点(brokers)之间启用SSL
    通讯,需要配置该配置参数为:SSL

最后我们总结合并一下所有的配置参数:

listeners=SSL://kafka.app.node1:9092
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=app123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=HTTPS
security.inter.broker.protocol=SSL

1.2.8 初步验证

正常启动集群的Zookeeper集群,然后依次启动集群的所有节点。使用下面的命令检查:

$ openssl s_client -debug -connect kafka.app.node1:9092 -tls1

该命令检查服务器的密钥库和信任库是否正确设置。命令中tls1
必须是集群配置参数ssl.enabled.protocols
所支持的协议。

Certificate chain
(省略)
---
Server certificate
-----BEGIN CERTIFICATE-----
(省略)
-----END CERTIFICATE-----
subject=(省略)
issuer=(省略)
---
No client certificate CA names sent
---
SSL handshake has read 2029 bytes and written 264 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-DES-CBC3-SHA
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
SSL-Session:
Protocol : TLSv1
Cipher : ECDHE-RSA-DES-CBC3-SHA
Session-ID: 5E580D610AEB5DDD8BCD0D31E88180F45391109792CA3CDD1E861EB87C704261
Session-ID-ctx:
Master-Key: E544FF34B993B2C3B7F7CB28D8166213F8D3A9864A82247F6948E33B319CD1A8943127DDF9B528EA73435EBC73B0DD55
Key-Arg : None
Start Time: 1582828897
Timeout : 7200 (sec)
Verify return code: 7 (certificate signature failure)
---
(省略)

如果证书未显示或有其他错误消息,则说明设置不正确。

另外对于'OpenSSL 0.9.8j-fips 07 Jan 2009'版本的openssl版本,由于这个版本不能自己检测出ssl的版本。会报下面的错误信息。

1816:error:1408E0F4:SSL routines:SSL3_GET_MESSAGE:unexpected message:s3_both.c:463:

1.3 配置kafka客户端

kafka集群需要支持集群内外的客户端交互访问。安全集群的客户端同样需要进行相关安全配置。这里客户端指的是Console客户端。

1.3.1 签发客户端证书

类似集群内部服务端的证书签发步骤,客户端证书签发过程入下:

  • 生成客户端SSL密钥和证书,输出密钥容器:client.keystore.jks

    $ keytool -keystore usr/ca/client/client.keystore.jks -alias kafka.app
    .node1 -validity 365 -genkey -keypass app123 -keyalg RSA -dname "CN=kafka.app.node1,OU=dccsh,O=icbc,L=shanghai,S=shanghai,C=cn" -ext SAN=DNS:kafka.app.node1 -storepass app123
  • 从密钥容器中提取和导出客户端证书(输出文件:client.cert-file,未签名)

    $ keytool -keystore usr/ca/client/client.keystore.jks -alias kafka.app.node1 -certreq -file usr/ca/client/client.cert-file -storepass app123
  • 给客户端证书签名(输出文件:client.cert-signed,已签名)

    $ openssl x509 -req -CA usr/ca/root/ca.cert.pem -CAkey usr/ca/root/ca.key.pem -in usr/ca/client/client.cert-file -out usr/ca/client/client.cert-signed -days 365 -CAcreateserial -passin pass:app123
  • 将CA证书导入客户端密钥容器中

    $ keytool -keystore usr/ca/client/client.keystore.jks -alias CARoot -import -file usr/ca/root/client.cert-file -storepass app123
  • 将已签名的证书导入密钥容器中

    $ keytool -keystore usr/ca/client/client.keystore.jks -alias kafka.app.node1 -import -file usr/ca/client/client.cert-signed -storepass app123

1.3.2 生成客户端信任库

使用下面的命令将CA证书导入客户端信任库,输出为信任库文件:client.truststore.jks

$ keytool -keystore usr/ca/trust/client.truststore.jks -alias CARoot -import -file usr/ca/root/ca.cert.pem -storepass app123

1.3.3 配置客户端

客户端的console-producer和console-consumer命令需要添加相关安全配置。

如果kafka集群不需要客户端身份验证,只需配置下面的配置:

security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123

如果需要客户端身份验证,还需要补充下面的配置信息:

ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123

根据我们的要求和代理配置,可能还需要其他配置设置:

  1. ssl.provider(可选)。用于SSL连接的安全提供程序的名称。

  2. ssl.cipher.suites(可选)。密码套件是认证,加密,MAC和密钥交换算法的命名组合,用于协商使用TLS或SSL网络协议的网络连接的安全设置。

  3. ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1。它应列出在代理方配置的至少一种协议

  4. ssl.truststore.type = JKS

  5. ssl.keystore.type = JKS

最后我们总结合并一下所有的配置参数(编辑文件名为:client-ssl.properties
):

security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123

1.3.4 消费者生产者

使用console-producer的命令:

kafka-console-producer.sh --broker-list kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --producer.config client-ssl.properties

使用console-consumer的命令:

kafka-console-consumer.sh --bootstrap-server kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092 --topic test --new-consumer --consumer.config client-ssl.properties

这里test
topic
名称,在只有SSL通信加密集群中,topic的创建、删除、生产、消费并没有权限管理,依然存在安全问题。所以kafka集群需要进一步配置权限管理。

第二部分 Kafka集群权限认证

Kafka集群的权限认证管理主要涉及:

  • 身份认证(Authentication)。对客户端与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。

  • 权限控制(Authorization)。实现对于消息级别的权限控制,客户端的读写操作进行Authorization(生产、消费)管理。

通俗的讲,身份认证解决的是证明你是谁,而权限控制解决的是你能干什么。在Kafka中身份认证和权限控制是两套独立的安全配置。

2.1 集群权限认证策略

Kafka从0.9.0.0版本后开始支持下面的SASL安全策略管理。这些安全功能为Kafka通信安全、多租户管理、集群云化提供了安全保障。截止目前Kafka 2.3版本,一共支持5种SASL方式。

验证方式版本说明
SASL/PLAIN0.10.0.0不能动态增加用户
SASL/SCRAM0.10.2.0可以动态增加用户。有两种方式:SASL/SCRAM-SHA-256 和SASL/SCRAM-SHA-512
SASL/GSSAPI0.9.0.0需要独立部署验证服务(即Kerberos服务)
SASL/OAUTHBEARER2.0.0需自己实现接口实现token的创建和验证,需要额外Oauth服务
SASL/Delegation Token1.1.0补充现有 SASL 机制的轻量级认证机制

对于生产环境,SASL/PLAIN方式有个缺点:只能在JAAS文件KafkaServer参数中配置用户,集群运行期间无法动态新增用户(需要重启重新加载JAAS文件),这对维护管理带来不便。而SASL/SCRAM方式,将认证数据存储在Zookeeper中,可以动态新增用户并分配权限。

SASL/GSSAPI方式需要依赖Kerberos服务。对于一些已经部署了集中式的Kerberos服务的大厂,只需要申请一个principal即可。如果生产Kerberos认证中出现TGT分发性能瓶颈,可以使用SASL/Delegation Token模式。使用 Kafka 提供的 API 去获取对应的 Delegation Token。Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证),减少性能压力。

同样SASL/OAUTHBEARER方式需要Oauth服务。

各种方式引入版本不同,使用依赖各有差异,需要结合自身业务特点选择合适的架构方式。

2.2 SASL/SCRAM策略配置介绍

SASL/SCRAM方式将身份认证和权限控制的凭证(credential)数据均存储在Zookeeper中,需要对Zookeeper进行安全配置。

2.2.1 Zookeeper集群侧配置

对Zookeeper集群中所有节点更新下面的策略后,重启集群生效。

  • 配置zoo.cfg
    文件

    文件尾部追加下面的配置:

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
  • 新增zk_server_jaas.conf
    文件

    配置文件内容如下:

    Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret;
    };

    其中username
    password
    定义的用户和密钥,用于Zookeeper与Kafka集群进行认证。配置项user_admin="admin-secret"
    中 admin为用户名,admin-secret为密码,用于Zookeeper集群外客户端和集群内进行认证。

  • 拷贝依赖包

    将kafka文件系统中kafka/libs
    目录下的jar包拷贝到zookeeper/lib
    目录。

    kafka-clients-2.1.1.jar
    lz4-java-1.5.0.jar
    osgi-resource-locator-1.0.1.jar
    slf4j-api-1.7.25.jar
    snappy-java-1.1.7.2.jar

    若没有引入依赖包,启动时会报找不到org.apache.kafka.common.security.plain.PlainLoginModule包的错误。

  • 修改zookeeper启动参数

    修改bin/zkEnv.sh
    文件, 在文件尾追加下面的配置内容。该配置完成引入的包的加载。变量CLASSPATH
    SERVER_JVMFLAGS
    都会在Zookeeper启动时传给JVM
    虚拟机。

    下面的配置中$ZOOKEEPER_HOME
    是zookeeper的环境变量,如果没有配置,使用绝对路径即可。

    for i in $ZOOKEEPER_HOME/lib/*.jar; do
      CLASSPATH="$i:$CLASSPATH"
    done
    SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOKEEPER_HOME/conf/zk_server_jaas.conf"

2.2.2 kafka集群侧配置

kafka集群中每一台节点均需要更新下面的配置。

  • 新增kafka_server_scram_jaas.conf
    文件(在config
    目录中)

    KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin-secret";
    # 自定义用户:
    # user_admin="admin-secret"
    # user_alice="alice-secret"
    # user_reader="reader-secret"
    # user_writer="writer-secret";
    };

其中配置username
password
为Kafka集群之间通讯的SCRAM凭证,用户名为admin
,密码为admin-secret

 配置中类似user_XXX
格式的配置项为自定义用户。如果是SASL/PLAIN方式,用户只能在该文件中定义,不能动态新增。我们使用SASL/SCRAM方式,可以后续动态声明admin用户,不再此处进行配置。

  • 更新Kafka的配置文件server.properties
    (在config
    目录中):

    #SASL CONFIG
    listeners=SASL_SSL://kafka.app.node1:9092
    sasl.enabled.mechanisms=SCRAM-SHA-512
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    #allow.everyone.if.no.acl.found=true
    super.users=User:admin
    #SSL CINFIG
    ssl.keystore.location=/usr/ca/server/server.keystore.jks
    ssl.keystore.password=app123
    ssl.key.password=app123
    ssl.truststore.location=/usr/ca/trust/server.truststore.jks
    ssl.truststore.password=app123
    ssl.client.auth=required
    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
    ssl.keystore.type=JKS
    ssl.truststore.type=JKS
    ssl.endpoint.identification.algorithm=HTTPS
    security.inter.broker.protocol=SASL_SSL

    需要注意参数allow.everyone.if.no.acl.found
    ,如果开启参数开关,当客户端和集群交互时候未找到ACL策略时,允许所有类型的访问操作。建议该参数关闭(false)。

    参数security.inter.broker.protocol
    指定集群brokers之间的通讯协议。不加密协议有:SASL_SSL、SASL_PLAINTEXT、PLAINTEXT;加密协议有:SSL。为了提高节点之间的交互性能,内部网络环境建议使用非加密协议。这里使用加密的SASL_SSL
    协议。

    参数super.users
    指定了集群的超级用户为:admin
    。注意如果指定多个超级用户,每个用户使用分号隔开,例如:super.users=User:admin;User:alice

    参数sasl.enabled.mechanisms
    列出支持的认证方式。即可以支持多种。

    参数sasl.mechanism.inter.broker.protocol
    指定集群内部的认证方式。Kafka仅支持最小迭代次数为4096的强哈希函数SHA-256和SHA-512。所以有SCRAM-SHA-512和SCRAM-SHA-256两种方式。

  • 配置kafka启动环境变量(bin
    目录下面的kafka-run-class.sh

    为 Kafka 添加 java.security.auth.login.config 环境变量(配置文件路径)。并且在启动模式中添加KAFKA_SASL_OPTS

    # 截取配置文件片段:
    KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/software/kafka/config/kafka_server_scram_jaas.conf'
    # Launch mode
    if [ "x$DAEMON_MODE" = "xtrue" ]; then
    nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < dev/null &
    else
    exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
    fi

2.2.3 SCRAM认证管理

在集群的配置文件kafka_server_scram_jaas.conf
中,定义了集群内部的认证用户。对于客户端和集群之间认证可以使用kafka-configs.sh
来动态创建。

  • 创建用户SCRAM凭证

    例如集群中的超级用户admin
    用户,使用下面的命令创建:

$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin

创建自定义普通用户alice

$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
  • 查看SCARM凭证

$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --describe --entity-type users --entity-name admin
  • 删除SCRAM凭证

$ kafka-configs.sh --zookeeper kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice

2.3 Kafka客户端配置

Kafka集群配置了认证,那么对于Console客户端访问集群自然需要配置认证信息。可集群节点内部通讯凭证的认知,同样需要定义JAAS文件。加入我们自定义了用户alice
,JAAS文件名为:kafka_console_client_jaas.conf
,配置内容如下:

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="alice"
password="alice-secret";
};

然后更新kafka-console-producer.sh
脚本和kafka-console-consumer.sh
脚本的启动参数。

# 文件截取更新部分:

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/software/kafka/config/kafka_write_jaas.conf"
fi

在配置SSL时候,我们新建了client-ssl.properties
配置文件,作为Console客户端启动配置。在集群启用SASL_SSL
后,我们同步更新如下:

security.protocol=SASL_SSL
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=app123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.keystore.password=app123
ssl.key.password=app123

至此Console客户端已经配置完毕,但目前Console客户端还不能通过命令方式和集群进行交互,因为我们指定的用户对于集群的资源还没有任何权限。需要对用户进行集群资源的ACL控制设置,赋予相关权限。

2.4 ACL控制

Kafka权限资源包含Topic、Group、Cluster、TransactionalId(事务id),每个资源涉及的权限内容如下:

资源类型权限类型
TopicRead,Write,Describe,Delete,DescribeConfigs,AlterConfigs,All
GroupRead,Describe,All
ClusterCreate,ClusterAction,DescribeConfigs,AlterConfigs,IdempotentWrite,Alter,Describe,All
TransactionalIdDescribe,Write,All

对于常用类型进行说明:

权限说明
Read读取topic、group信息
Write写topic、TransactionalId(存储在内部topic)
Delete删除topic
Create创建topic
ALTER修改topic
Describe获取topic、group、TransactionalId信息
ALL所有权限

Kafka提供ACL管理脚本:kafka-acls.sh

2.4.1 更新脚本配置

认证数据均存储在Zookeeper集群中,需要和Zookeeper交互自然需要配置相关认证信息。

首先需要新建JAAS文件,文件名为:zk_client_jaas.conf
。这里的用户已经在Zookeeper集群中进行定义。

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};

最后更新kafka-acls.sh
脚本:

# 截取更新部分

if [ "x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/software/kafka/config/zk_client_jaas.conf"
fi

当然Kafka集群的配置文件中已经开启了ACL:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

至此完成配置。

2.4.2 ACL配置

根据官网的介绍,ACL的格式如下:

“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”

参数含义描述如下:

  • principal:指定一个Kafka user;

  • operation:指定一个具体的操作类型,例如:Read, Write, Delete等;

  • Host:表示与集群交互的客户端IP地址,如果是通配符‘*’表示所有IP。目前不支持主机名(hostname)形式,只能是IP地址;

  • Resource:指定一种Kafka资源类型(共有4种类型);

例如下面的ACL命令:

$ sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --add --allow-principal User:alice --allow-host '*' --operation ALL --topic test

赋权之后,用户alice对test具有全部权限,并且访问请求可以是来自任何IP的客户端。

常用参数的补充说明:

  • 对主机IP的限制参数,allow-host
    指定允许的IP,deny-host
    指定禁用IP;

  • 新增和删除一个赋权策略,分别使用:add
    remove

2.4.3 ACL策略查看

使用参数list
参看ACL策略。例如:

$ sh kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=kafka.app.node1:2181,kafka.app.node2:2181,kafka.app.node3:2181 --list --topic test-topic

该查看命令显示test-topic
资源相关的所有ACL策略。

2.4.4 超级用户

kafka集群的配置文件server.properties
中定义了超级用户(Super Users),超级用户不在ACL控制范围内,默认可以访问集群中所有资源,并具备所有权限。

2.5 权限认证数据访问

集群的认证数据存储在Zookeeper,可以通过Zookeeper的console客户端访问认证数据。

使用zookeeper自带的命令行客户端:

/dmqs/zookeeper/bin> ./zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled
[zk: localhost:2181(CONNECTING) 0]

查看zookeeper中的数据:

[zk: localhost:2181(CONNECTED) 1] ls 
[cluster, controller_epoch, controller, brokers, zookeeper, kafka-acl, kafka-acl-changes, admin, isr_change_notification, consumers, config]

其中kafka-acl
中存储相关权限认证数据。

[zk: localhost:2181(CONNECTED) 3] ls kafka-acl
[Cluster, Topic]

可以查看其中的权限信息。

2.6 SSL和SASL的说明

SSL是传输层安全协议,是位于传输层(TCP/IP)和应用层(HTTP)的协议,SSL是对整个传输过程的加密,SSL是对客户端和服务器之间传输的所有数据进行加密。假如在配置的时候使用了SASL,但是没有使用SSL,那么除了账号密码外,所有的传输内容都是裸奔的。

所以生产集群采用SSL和SASL结合方式,即SSL_SASL方式。

第三部分 安全集群的客户端

3.1 开发语言类

3.1.1 Python客户端

目前市面上kafka的python API常用的有三种:

  • 第一种 kafka

该项目是kafka-python
的老项目,2017年后调整为kafka-python
项目。

  • 第二种 kafka-python

最新版本为2.0,首先从客户端的密钥库中导出CA证书。

$ keytool -exportcert -alias CARoot -keystore client.keystore.jks -rfc -file ca.cert.pem

生产者和消费者的案例如下:

# -*- coding: utf-8 -*-

from kafka import KafkaConsumer, KafkaProducer
import kafka
import ssl
import logging
import time
#logging.basicConfig(level=logging.DEBUG)

try:
   bootstrap_servers = 'kafka.itdw.node1:9092,kafka.itdw.node2:9092,kafka.itdw.node3:9092'
   topic = "test"
   sasl_mechanism = "SCRAM-SHA-512"
   username = "alice"
   password = "alice-secret"
   security_protocol = "SASL_SSL"
   # CA 证书路径
   ssl_cafile = 'ca.cert.pem'
   # SSL
   context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
   context.verify_mode = ssl.CERT_NONE
   context.check_hostname = False
   context.load_verify_locations(ssl_cafile)
   # 消费者
   consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers,
                              api_version=(0, 10),
                              security_protocol=security_protocol,
                              ssl_context=context,
                              sasl_mechanism = sasl_mechanism,
                              sasl_plain_username = username,
                              sasl_plain_password = password
                            )
   # 生产者
   producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                            api_version=(0, 10),
                            acks='all',
                            retries=1,
                            security_protocol=security_protocol,
                            ssl_context=context,
                            sasl_mechanism=sasl_mechanism,
                            sasl_plain_username=username,
                            sasl_plain_password=password
                            )
   # 生产数据
   for i in range(10):
       producer.send(topic, bytes("测试",encoding='utf8'))
   producer.flush()
   # 消费数据
   for msg in consumer:
       print(msg)

except Exception as e:
   print(e)

需要的注意的事项有:

  • Kafka集群启用主机名模式,所以应用程序运行节点的hosts文件需要配置Kafka集群节点的域名映射。

  • ssl_context参数为包装套接字连接的预配置SSLContext。如果非None,将忽略所有其他ssl_ *配置。

  • 主机名验证问题。如果证书中域名和主机名不匹配,客户端侧需要配置需要调整如下:

    ssl_ctx.check_hostname=False
    ssl_ctx.verify_mode = CERT_NONE
  • 如果不提前预配置SSLContext,还需要客户端的证书。

    $ keytool -exportcert -alias localhost -keystore client.keystore.jks -rfc -file client.cert.pem

    生产者的参数需要添加:

    ssl_certfile = "client.cert.pem"
    ssl_cafile = "ca.cert.pem"
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                            api_version=(0, 10),
                            acks='all',
                            retries=1,
                            security_protocol=security_protocol,
                            ssl_context=context,
                            sasl_mechanism=sasl_mechanism,
                            sasl_plain_username=username,
                            sasl_plain_password=password,
                            ssl_check_hostname=False,
                            ssl_certfile=ssl_certfile,
                            ssl_cafile=ssl_cafile)
  • 第三种 confluent-kafka

confluent-kafka包由confluent公司开源,主要是对C/C++客户端包(librdkafka)的封装。案例代码如下:

# -*- coding: utf-8 -*-
from confluent_kafka import Producer

# 回调函数
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
       print(’Message delivery failed: {}.format(err))
    else:
       print(‘Message delivered to {} [{}].format(msg.topic(),           msg.partition()))

if __name__ == ‘__main__‘:
    producerConfing = {"bootstrap.servers": 'kafka.itdw.node1:9092,kafka.itdw.node2:9092,kafka.itdw.node3:9092',
        "security.protocol": 'SASL_SSL',
        "sasl.mechanisms": 'SCRAM-SHA-256',
        "sasl.username": 'alice',
        "sasl.password": 'alice-secret',
        "ssl.ca.location": 'ca.cert.pem'
    }
    ProducerTest = Producer(producerConfing)

    ProducerTest.poll(0)
    ProducerTest.produce(‘testTopic‘, ‘confluent kafka test‘.encode(‘utf-8),callback=delivery_report)
    ProducerTest.flush()

3.1.2 Go客户端

我们的Go语言中常用的Kafka的客户端包有:

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/segmentio/ksuid"

其中最常用的是sarama
,案例参考github项目

3.1.3 Java客户端

生产者:

package com.kafka.security;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.util.Properties;
import java.util.Random;


public class KafkaProducerWithSASL_SSL {
   private static final String KAFKA_TOPIC = "topsec";
   private static final String BOOTSTRAP_SERVER = "docker31:9092";
   private static final String[] strs = new String[]{"zhao", "qian", "sun", "li", "zhou", "wu", "zheng", "wang", "feng", "chen"};
   private static final Random r = new Random();

   public static void main(String[] args) {
       try {
           producer();
      } catch (InterruptedException e) {
           e.printStackTrace();
      }
  }

   private static void producer() throws InterruptedException {
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
       //SASL_SSL加密
       props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
       props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\trust\\client.truststore.jks");
       props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "hadoop");
       // SSL用户认证
       props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\client\\client.keystore.jks");
       props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "hadoop");
       props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hadoop");
       //SASL用户认证
                               props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
       props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");

       props.put(ProducerConfig.ACKS_CONFIG, "all");
       props.put(ProducerConfig.RETRIES_CONFIG, 0);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

       props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
       props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
       props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

       Producer<String, String> producer = new KafkaProducer<>(props);
       while (true) {
           producer.send(new ProducerRecord<>(KAFKA_TOPIC, strs[r.nextInt(10)],strs[r.nextInt(10)]));
           Thread.sleep(2000);
      }
  }
}

消费者:

package com.topsec.kafka.security;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.util.Collections;
import java.util.Properties;


public class KafkaConsumerWithSASLAndSSL {
   private static final String KAFKA_TOPIC = "topsec";
   private static final String BOOTSTRAP_SERVER = "docker31:9092";
   public static void main(String[] args) {
       consumer();
  }

   private static void consumer() {
       Properties props = new Properties();
       //SASL_SSL加密配置
       props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
       props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\trust\\client.truststore.jks");
       props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "hadoop");
       //SSL身份验证配置
       props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "D:\\Download\\ca\\client\\client.keystore.jks");
       props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "hadoop");
       props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hadoop");
       //SASL身份验证
       props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin-secret\";");
       props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");

       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

       props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
       props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
       props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6000");

       KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
       consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
       while (true) {
           ConsumerRecords<String, String> records = consumer.poll(2000);
           for (ConsumerRecord<String, String> record : records) {
               System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
                       record.offset(),
                       record.key(),
                       record.value(),
                       record.partition());
          }
      }
  }
}

3.2 组件类

3.2.1 Console客户端

客户端节点部署kafka项目,在bin目录下面我们已经更新了kafka-console-consumer.sh
kafka-console-producer.sh
两个脚本。并分别新增了加密访问的配置文件consumer.config
producer.config

命令案例参考:1.3.4 章节内容。

3.2.2 Flume客户端

目前Flume项目官网项目文档介绍支持下面三种方式:

  • SASL_PLAINTEXT - 无数据加密的 Kerberos 或明文认证;

  • SASL_SSL - 有数据加密的 Kerberos 或明文认证;

  • SSL - 基于TLS的加密,可选的身份验证;

事实上对于SASL/SCRAM方式Flume也是支持的。具体配置如下(以Flume1.9版本为例):

3.2.2.1 第一步 新增jaas配置文件

在Flume的conf配置目录下面新增flume_jaas.conf文件,文件内容:

Server {
  org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret";
};

KafkaClient {
  org.apache.kafka.common.security.scram.ScramLoginModule required
   username="admin"
   password="admin-secret";
};
3.2.2.2 第二步 更新flume-env.sh文件

Flume的conf配置目录下面flume-env.sh
文件添加JAVA_OPTS
配置更新:

JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/dmqs/apache-flume-1.9.0-bin/conf/flume_jaas.conf"

其中路径为第一步中新增的flume_jaas.conf
文件路径。

3.2.2.3  测试案例(sinks)

我们使用一个简单的案例来测试,Flume的source为监控文件尾写入,Flume的sinks为加密kafka集群。具体配置如下:

#define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f dmqs/apache-flume-1.9.0-bin/data/flume/flume.log
a1.sources.r1.shell = bin/bash -c

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 15000
a1.channels.c1.transactionCapacity = 15000

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = kafka.itdw.node1:9093
a1.sinks.k1.kafka.topic = flume
a1.sinks.k1.kafka.flumeBatchSize = 15000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1000
a1.sinks.k1.kafka.producer.security.protocol=SASL_SSL
a1.sinks.c1.kafka.producer.sasl.mechanism =SCRAM-SHA-512
a1.sinks.c1.kafka.producer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"
####
a1.sinks.k1.kafka.producer.ssl.truststore.location =/usr/ca/trust/client.truststore.jks
a1.sinks.k1.kafka.producer.sasl.mechanism =SCRAM-SHA-512
a1.sinks.k1.kafka.producer.ssl.truststore.password=app123
a1.sinks.k1.kafka.producer.ssl.keystore.location=/usr/ca/client/client.keystore.jks
a1.sinks.k1.kafka.producer.ssl.keystore.password=app123
a1.sinks.k1.kafka.producer.ssl.key.password=app123
a1.sinks.k1.kafka.producer.timeout.ms = 100
a1.sinks.k1.batchSize=15000
a1.sinks.k1.batchDurationMillis=2000

配置保存为flume-sink-auth-kafka.conf
,为了检查输出结果使用下面命令启动(在bin目录中):

./flume-ng agent --conf ../conf --conf-file ../conf/flume-sink-auth-kafka.conf --name a1 -Dflume.root.logger=INFO,console

向文件尾部追加信息:

echo "test" >> dmqs/apache-flume-1.9.0-bin/data/flume/flume.log

然后使用消费者客户端查看数据是否写入kafka的flume主题中。

3.2.2.3  测试案例(source)

同样可以也可以将加密Kafka作为Flume的source,配置案例如下:

#define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = kafka.app .node1:9093,kafka.app.node2:9093,kafka.app.node3:9093
a1.sources.r1.kafka.topics = flume
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.kafka.consumer.timeout.ms = 2000
a1.sources.r1.batchSize=150
a1.sources.r1.batchDurationMillis=1000
#####
a1.sources.r1.kafka.consumer.ssl.truststore.location =/usr/ca/trust/client.truststore.jks
a1.sources.r1.kafka.consumer.sasl.mechanism =SCRAM-SHA-512
a1.sources.r1.kafka.consumer.ssl.truststore.password=itdw123
a1.sources.r1.kafka.consumer.ssl.keystore.location=/usr/ca/client/client.keystore.jks
a1.sources.r1.kafka.consumer.ssl.keystore.password=itdw123
a1.sources.r1.kafka.consumer.ssl.key.password=itdw123
a1.sources.r1.kafka.consumer.security.protocol=SASL_SSL
a1.sources.r1.kafka.consumer.sasl.mechanism =SCRAM-SHA-512
a1.sources.r1.kafka.consumer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 15000
a1.channels.c1.transactionCapacity = 15000

# sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /dmqs/apache-flume-1.9.0-bin/data/flume
a1.sinks.k1.sink.serializer = TEXT

案例中将加密Kafka中flume主题中的数据汇入到指定目录的文件中。

3.2.3 Logstash客户端

Logstash和Kafka交互使用Kafka output plugin
插件实现。其中配置文件中output部分如下:

output {
 kafka {
   id => "kafkaSLL_SASL"
   codec => "json"
   ssl_endpoint_identification_algorithm => ""
   bootstrap_servers => "kafka.app.node1:9092,kafka.app.node2:9092,kafka.app.node3:9092"
   ssl_keystore_location => "/etc/logstash/certificates/client.keystore.jks"
   ssl_keystore_password => "app123"
   ssl_keystore_type => "JKS"
   ssl_truststore_location => "/etc/logstash/certificates/client.truststore.jks"
   ssl_truststore_password => "app123"
   ssl_truststore_type => "JKS"
   sasl_mechanism => "SCRAM-SHA-512"
   security_protocol => "SASL_SSL"
   sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='alice' password='alice-secret';"
   topic_id => "test"
  }
}

第四部分 加密认证集群的性能压测

集群启用SSL后,数据交互中需要加密、解密。kafka集群的I/O性能会降低。我们使用Kafka自带的压侧工具对集群加密前和加密后性能进行评测。

4.1生产者压力测试

客户端写入参数配置为acks=all
(即主题中Leader和fellow副本均写入成功)。每条消息大小为1M(消息体小吞吐量会大一些)。另外测试客户端为集群内部节点,忽略了数据网络传输的性能消耗。

4.1.1不加密集群

./kafka-consumer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all

测试结果:

1500000 records sent, 38538.615693 records/sec (37.64 MB/sec), 748.44 ms avg latency, 5485.00 ms max latency, 227 ms 50th, 3194 ms 95th, 3789 ms 99th, 3992 ms 99.9th.

4.1.2加密集群

./kafka-producer-perf-test.sh --topic topsec --throughput 50000 --num-records 1500000 --record-size 10000 --producer-props bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093 acks=all --producer.config producer.config

测试结果:

1500000 records sent, 16901.027582 records/sec (16.50 MB/sec), 1713.43 ms avg latency, 9345.00 ms max latency, 72 ms 50th, 1283 ms 95th, 2067 ms 99th, 2217 ms 99.9th.

4.2 压侧结论

加密改造前,生产者的吞吐量为3.8w 条/秒,改造后1.7W 条/秒。整体吞吐性能降低50%左右,数据的加密、解密导致吞吐量性能降低。平均时延也增加了一倍多(改造前700ms,改造后1700ms)。在实际生产中可参考这个性能折扣基线配置集群资源。

参考文献及资料

1、Kafka官网对安全类功能介绍,链接:http://kafka.apache.org/documentation/#security

2、Kafka ACLs in Practice – User Authentication and Authorization,链接:https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/

3、维基百科(数字证书),链接:https://zh.wikipedia.org/wiki/公開金鑰認證

4、SSL技术白皮书,链接:https://blog.51cto.com/xuding/1732723

5、Kafka权限管理,链接:https://www.jianshu.com/p/09129c9f4c80

5、Flume文档,链接:https://flume.apache.org/FlumeUserGuide.html#kafka-sinkorg/FlumeUserGuide.html#kafka-sink


文章转载自数据科学和工程,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论