目录
背景
第一部分 kafka-manager安装
第二部分 kafka-manager配置
第三部分 kafka-manager管理
第四部分 总结
参考文献及资料
背景
在Kafka的监控系统中有很多优秀的开源监控系统。比如Kafka-manager,open-faclcon,zabbix等主流监控工具均可直接监控kafka。Kafka集群性能监控可以从消息网络传输,消息传输流量,请求次数等指标来衡量集群性能。这些指标数据可以通过访问kafka集群的JMX接口获取。Kafka-manager工具由Yahoo研发的Kafka管理和监控工具,并在github上开源。
对于非加密Kafka集群配置Kafka manager,目前互联网也有大量的资料。而对于加密集群(特别是云端集群还配置了域名方式),参考材料较为匮乏。本文针对云端加密Kafka集群配置Kafka Manager进行详细介绍,供大家参考。
第一部分 kafka-manager安装
1.1 版本选择
版本使用cmak-3.0.0.0
版本,依赖java11(使用openjdk-11+28_linux-x64_bin.tar.gz
)。使用已经编译好的介质包cmak-3.0.0.0.zip
。假设安装目录为/dmqs
。
1.2 介质部署
1.2.1 部署cmak
上传cmak-3.0.0.0.zip
至安装目录/dmqs
,使用命令解压:
f-itdw-4c8g-100g-11:/dmqs # unzip cmak-3.0.0.0.zip1.2.2 部署java
上传openjdk-11+28_linux-x64_bin.tar.gz
介质到/dmqs/cmak-3.0.0.0
路径:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # tar -zxvf openjdk-11+28_linux-x64_bin.tar.gz重命名java路径名:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # mv jdk11 jdk1.3 配置文件准备
1.3.1 配置application.conf
文件
备份文件并修改:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # cp application.conf application.conf.bak
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # vi application.conf
调整afka-manager.zkhosts
参数项的配置信息:
kafka-manager.zkhosts="84.10.228.50:2181,84.10.228.55:2181,84.10.228.56:2181"1.3.2 加密集群配置jaas
文件
如果是加密集群需要准备jaas
文件,文件名为:kafka_server_jaas.conf
。
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # touch kafka_server_jaas.conf文件内容如下:
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
上面配置中KafkaClient
为和kafka通信配置;Client
为和zookeeper通信配置。
1.3.3 配置consumer.properties
首先备份:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # cp consumer.properties consumer.properties.bak
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # vi consumer.properties
配置文件调整为:
#security.protocol=PLAINTEXT
#key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
#value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeseriazer
bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=itdw123
ssl.keystore.password=itdw123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.key.password=itdw123
ssl.endpoint.identification.algorithm=
其中注释部分为源配置文件内容。
1.4 准备ca信任证书
对于已经配置为域名方式的Kafka集群需要配置域名信任证书。
创建InstallCert.java
,java程序文件:
/*
* Copyright 2006 Sun Microsystems, Inc. All Rights Reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* - Neither the name of Sun Microsystems nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
import java.io.*;
import java.net.URL;
import java.security.*;
import java.security.cert.*;
import javax.net.ssl.*;
public class InstallCert {
public static void main(String[] args) throws Exception {
String host;
int port;
char[] passphrase;
if ((args.length == 1) || (args.length == 2)) {
String[] c = args[0].split(":");
host = c[0];
port = (c.length == 1) ? 443 : Integer.parseInt(c[1]);
String p = (args.length == 1) ? "changeit" : args[1];
passphrase = p.toCharArray();
} else {
System.out.println("Usage: java InstallCert <host>[:port] [passphrase]");
return;
}
File file = new File("jssecacerts");
if (file.isFile() == false) {
char SEP = File.separatorChar;
File dir = new File(System.getProperty("java.home") + SEP
+ "lib" + SEP + "security");
file = new File(dir, "jssecacerts");
if (file.isFile() == false) {
file = new File(dir, "cacerts");
}
}
System.out.println("Loading KeyStore " + file + "...");
InputStream in = new FileInputStream(file);
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
ks.load(in, passphrase);
in.close();
SSLContext context = SSLContext.getInstance("TLS");
TrustManagerFactory tmf =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
X509TrustManager defaultTrustManager = (X509TrustManager)tmf.getTrustManagers()[0];
SavingTrustManager tm = new SavingTrustManager(defaultTrustManager);
context.init(null, new TrustManager[] {tm}, null);
SSLSocketFactory factory = context.getSocketFactory();
System.out.println("Opening connection to " + host + ":" + port + "...");
SSLSocket socket = (SSLSocket)factory.createSocket(host, port);
socket.setSoTimeout(10000);
try {
System.out.println("Starting SSL handshake...");
socket.startHandshake();
socket.close();
System.out.println();
System.out.println("No errors, certificate is already trusted");
} catch (SSLException e) {
System.out.println();
e.printStackTrace(System.out);
}
X509Certificate[] chain = tm.chain;
if (chain == null) {
System.out.println("Could not obtain server certificate chain");
return;
}
BufferedReader reader =
new BufferedReader(new InputStreamReader(System.in));
System.out.println();
System.out.println("Server sent " + chain.length + " certificate(s):");
System.out.println();
MessageDigest sha1 = MessageDigest.getInstance("SHA1");
MessageDigest md5 = MessageDigest.getInstance("MD5");
for (int i = 0; i < chain.length; i++) {
X509Certificate cert = chain[i];
System.out.println
(" " + (i + 1) + " Subject " + cert.getSubjectDN());
System.out.println(" Issuer " + cert.getIssuerDN());
sha1.update(cert.getEncoded());
System.out.println(" sha1 " + toHexString(sha1.digest()));
md5.update(cert.getEncoded());
System.out.println(" md5 " + toHexString(md5.digest()));
System.out.println();
}
System.out.println("Enter certificate to add to trusted keystore or 'q' to quit: [1]");
String line = reader.readLine().trim();
int k;
try {
k = (line.length() == 0) ? 0 : Integer.parseInt(line) - 1;
} catch (NumberFormatException e) {
System.out.println("KeyStore not changed");
return;
}
X509Certificate cert = chain[k];
String alias = host + "-" + (k + 1);
ks.setCertificateEntry(alias, cert);
OutputStream out = new FileOutputStream("jssecacerts");
ks.store(out, passphrase);
out.close();
System.out.println();
System.out.println(cert);
System.out.println();
System.out.println
("Added certificate to keystore 'jssecacerts' using alias '"
+ alias + "'");
}
private static final char[] HEXDIGITS = "0123456789abcdef".toCharArray();
private static String toHexString(byte[] bytes) {
StringBuilder sb = new StringBuilder(bytes.length * 3);
for (int b : bytes) {
b &= 0xff;
sb.append(HEXDIGITS[b >> 4]);
sb.append(HEXDIGITS[b & 15]);
sb.append(' ');
}
return sb.toString();
}
private static class SavingTrustManager implements X509TrustManager {
private final X509TrustManager tm;
private X509Certificate[] chain;
SavingTrustManager(X509TrustManager tm) {
this.tm = tm;
}
public X509Certificate[] getAcceptedIssuers() {
throw new UnsupportedOperationException();
}
public void checkClientTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
throw new UnsupportedOperationException();
}
public void checkServerTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
this.chain = chain;
tm.checkServerTrusted(chain, authType);
}
}
}
上传至目的目录,并编译:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/javac InstallCert.java编译后生成下面的文件:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # ll
-rw-r--r-- 1 dmqs dmqs 975 May 28 02:23 InstallCert$SavingTrustManager.class
-rw-r--r-- 1 dmqs dmqs 6126 May 28 02:23 InstallCert.class
-rw-r--r-- 1 dmqs dmqs 6884 May 28 02:21 InstallCert.java
添加域名(kafka集群配置为域名方式)到jssecacerts
文件中:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node1:9093这时在当前目录就生成了jssecacerts
文件。如果集群是多节点,需要将其他节点域名信息追加到这个文件中。执行命令即为:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node2:9093
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node3:9093
这样就生成了集群所有的节点域名的信任证书。
最后将jssecacerts
文件拷贝至jdk/lib/security
:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # cp jssecacerts jdk/lib/security完成所有配置的准备。
1.5 服务启动
完成配置文件准备后,使用下面的命令启动Kafka-manager
服务:
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/bin # ./cmak -java-home ../jdk -Djava.security.auth.login.config=../conf/kafka_server_jaas.conf -Dapplication.home=/dmqs/cmak-3.0.0.0 > dev/null 2>&1 &其中参数命令说明如下:
参数
-java-home
指定服务启动的java依赖环境目录;参数
-Djava.security.auth.login.config
指定和kafka和zookeeper交互的jaas文件路径;参数
-Dapplication.home
指定了应用的主目录;参数
-Dhttp.port=8888
指定了应用的监听端口,默认9000;参数
-Dconfig.file=../conf/application.conf
指定了应用的应用配置文件;
启动后应用目录下面生成logs
目录,作为日志存放目录。启动命令不指定端口的情况下,默认监听9000
端口。
1.6 自动化脚本
为了提高服务运维管理,对服务启停进行自动化管理。
#!/bin/bash -e
RETVAL=0
cmak="/dmqs/cmak-3.0.0.0/bin/cmak"
start() {
$cmak -java-home ../jdk -Djava.security.auth.login.config=../conf/kafka_server_jaas.conf -Dapplication.home=/dmqs/cmak-3.0.0.0 >/dev/null 2>&1 &
RETVAL=$?
[ $RETVAL -eq 0 ] && echo "Start Kafka Manager Success!" ||echo "Start Kafka Manager failed!"
return $RETVAL
}
stop() {
CMAKPID=$(ps -ef|grep cmak|grep -v grep| awk '{print $2}')
if [[ -a dmqs/cmak-3.0.0.0/RUNNING_PID ]]
then
rm dmqs/cmak-3.0.0.0/RUNNING_PID && echo -e "\n已删除文件:RUNNING_PID\n" && kill -9 $CMAKPID >/dev/null 2>&1 &
RETVAL=$?
else
kill -9 $CMAKPID >/dev/null 2>&1 &
RETVAL=$?
fi;
[ $? -eq 0 ] && echo "Stop Kafka Manager Success!" ||echo "Stop Kafka Manager failed!"
return $RETVAL
}
case "$1" in
start)
start
;;
stop)
stop
;;
restart)
sh $0 stop
sh $0 start
;;
*)
echo "Format error!"
echo $"Usage: $0 {start|stop|restart}"
exit 1
;;
esac
exit $RETVAL
对于启动命令,可以自定义修改。
第二部分 kafka-manager配置
2.1 创建新集群管理
创建新的管理集群,需要填入下面的信息:
Cluster Name
集群名称;
Cluster Zookeeper Hosts
配置kafka集群背后的zookeeper集群的信息。例如:192.168.1.1:2181;
Kafka Version
Kafka的版本信息;
Enable JMX Polling (Set JMX_PORT env variable before starting kafka server)
是否启用集群的监控组件。
Security Protocol
安全协议。目前支持:SSL、SASL_PLAINTEXT、SASL_SSL、PLAINTEXT
SASL Mechanism (only applies to SASL based security)
SASL的权限管理协议:DEFAULT、PLAIN、GSSAPI、SCRAM-SHA-256、SCRAM-SHA-512
SASL JAAS Config (only applies to SASL based security)
SASL的用户配置信息。例如:
org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";需要注意的是配置以分号结束,否则会报错。
第三部分 kafka-manager管理
Kafka Manager服务启动后,默认监听9000端口,所以服务URL地址为:http://102.168.1.1:9000
。目前组件支持的管理功能有:
管理多个集群
轻松检查集群状态(主题,使用者,偏移量,代理,副本分发,分区分发)
运行首选副本选择
生成带有选项的分区分配,以选择要使用的代理
运行分区的重新分配(基于生成的分配)
使用可选的主题配置创建主题(0.8.1.1与0.8.2+具有不同的配置)
删除主题(仅在0.8.2+上受支持,并记住在代理配置中设置delete.topic.enable = true)
现在,主题列表指示标记为删除的主题(仅在0.8.2+上受支持)
批量生成多个主题的分区分配,并可以选择要使用的代理
批量运行分区的多个主题的重新分配
将分区添加到现有主题
更新现有主题的配置
(可选)为代理级别和主题级别的度量启用JMX轮询。
(可选)过滤出在Zookeeper中没有id / owner /&offsets /目录的使用者。
对于具体的组件使用,可以参文献中的[3]。
参考文献及资料
1、kafka-manager项目地址,链接:https://github.com/yahoo/kafka-manager
2、kafka-manager项目下载地址,链接:https://blog.wolfogre.com/posts/kafka-manager-download/
3、Apache Kafka集群管理工具CMAK(Cluster Manager for Apache Kafka)从安装启动到配置使用,链接:http://www.luyixian.cn/news_show_324464.aspx




