暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka Manager管理组件配置及使用

数据科学和工程 2020-06-01
2159


目录

  • 背景

  • 第一部分  kafka-manager安装

  • 第二部分  kafka-manager配置

  • 第三部分  kafka-manager管理

  • 第四部分  总结

  • 参考文献及资料

背景

在Kafka的监控系统中有很多优秀的开源监控系统。比如Kafka-manager,open-faclcon,zabbix等主流监控工具均可直接监控kafka。Kafka集群性能监控可以从消息网络传输,消息传输流量,请求次数等指标来衡量集群性能。这些指标数据可以通过访问kafka集群的JMX接口获取。Kafka-manager工具由Yahoo研发的Kafka管理和监控工具,并在github上开源。

对于非加密Kafka集群配置Kafka manager,目前互联网也有大量的资料。而对于加密集群(特别是云端集群还配置了域名方式),参考材料较为匮乏。本文针对云端加密Kafka集群配置Kafka Manager进行详细介绍,供大家参考。

第一部分 kafka-manager安装

1.1 版本选择

版本使用cmak-3.0.0.0
版本,依赖java11(使用openjdk-11+28_linux-x64_bin.tar.gz
)。使用已经编译好的介质包cmak-3.0.0.0.zip
。假设安装目录为/dmqs

1.2 介质部署

1.2.1 部署cmak

上传cmak-3.0.0.0.zip
至安装目录/dmqs
,使用命令解压:

f-itdw-4c8g-100g-11:/dmqs # unzip cmak-3.0.0.0.zip

1.2.2 部署java

上传openjdk-11+28_linux-x64_bin.tar.gz
介质到/dmqs/cmak-3.0.0.0
路径:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # tar -zxvf openjdk-11+28_linux-x64_bin.tar.gz

重命名java路径名:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # mv jdk11 jdk

1.3 配置文件准备

1.3.1 配置application.conf
文件

备份文件并修改:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # cp application.conf application.conf.bak
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # vi application.conf

调整afka-manager.zkhosts
参数项的配置信息:

kafka-manager.zkhosts="84.10.228.50:2181,84.10.228.55:2181,84.10.228.56:2181"

1.3.2 加密集群配置jaas
文件

如果是加密集群需要准备jaas
文件,文件名为:kafka_server_jaas.conf

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # touch kafka_server_jaas.conf

文件内容如下:

KafkaClient {
  org.apache.kafka.common.security.scram.ScramLoginModule required
   username="admin"
   password="admin-secret";
};

Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret";
};

上面配置中KafkaClient
为和kafka通信配置;Client
为和zookeeper通信配置。

1.3.3 配置consumer.properties

首先备份:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # cp consumer.properties consumer.properties.bak
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/conf # vi consumer.properties

配置文件调整为:

#security.protocol=PLAINTEXT
#key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
#value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeseriazer

bootstrap.servers=kafka.itdw.node1:9093,kafka.itdw.node2:9093,kafka.itdw.node3:9093
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/usr/ca/trust/client.truststore.jks
ssl.truststore.password=itdw123
ssl.keystore.password=itdw123
ssl.keystore.location=/usr/ca/client/client.keystore.jks
ssl.key.password=itdw123
ssl.endpoint.identification.algorithm=

其中注释部分为源配置文件内容。

1.4 准备ca信任证书

对于已经配置为域名方式的Kafka集群需要配置域名信任证书。

创建InstallCert.java
,java程序文件:

/*
* Copyright 2006 Sun Microsystems, Inc. All Rights Reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
*   - Redistributions of source code must retain the above copyright
*     notice, this list of conditions and the following disclaimer.
*
*   - Redistributions in binary form must reproduce the above copyright
*     notice, this list of conditions and the following disclaimer in the
*     documentation and/or other materials provided with the distribution.
*
*   - Neither the name of Sun Microsystems nor the names of its
*     contributors may be used to endorse or promote products derived
*     from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

import java.io.*;
import java.net.URL;

import java.security.*;
import java.security.cert.*;

import javax.net.ssl.*;

public class InstallCert {

   public static void main(String[] args) throws Exception {
   String host;
   int port;
   char[] passphrase;
   if ((args.length == 1) || (args.length == 2)) {
       String[] c = args[0].split(":");
       host = c[0];
       port = (c.length == 1) ? 443 : Integer.parseInt(c[1]);
       String p = (args.length == 1) ? "changeit" : args[1];
       passphrase = p.toCharArray();
  } else {
       System.out.println("Usage: java InstallCert <host>[:port] [passphrase]");
       return;
  }

   File file = new File("jssecacerts");
   if (file.isFile() == false) {
       char SEP = File.separatorChar;
       File dir = new File(System.getProperty("java.home") + SEP
           + "lib" + SEP + "security");
       file = new File(dir, "jssecacerts");
       if (file.isFile() == false) {
       file = new File(dir, "cacerts");
      }
  }
   System.out.println("Loading KeyStore " + file + "...");
   InputStream in = new FileInputStream(file);
   KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
   ks.load(in, passphrase);
   in.close();

   SSLContext context = SSLContext.getInstance("TLS");
   TrustManagerFactory tmf =
       TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
   tmf.init(ks);
   X509TrustManager defaultTrustManager = (X509TrustManager)tmf.getTrustManagers()[0];
   SavingTrustManager tm = new SavingTrustManager(defaultTrustManager);
   context.init(null, new TrustManager[] {tm}, null);
   SSLSocketFactory factory = context.getSocketFactory();

   System.out.println("Opening connection to " + host + ":" + port + "...");
   SSLSocket socket = (SSLSocket)factory.createSocket(host, port);
   socket.setSoTimeout(10000);
   try {
       System.out.println("Starting SSL handshake...");
       socket.startHandshake();
       socket.close();
       System.out.println();
       System.out.println("No errors, certificate is already trusted");
  } catch (SSLException e) {
       System.out.println();
       e.printStackTrace(System.out);
  }

   X509Certificate[] chain = tm.chain;
   if (chain == null) {
       System.out.println("Could not obtain server certificate chain");
       return;
  }

   BufferedReader reader =
       new BufferedReader(new InputStreamReader(System.in));

   System.out.println();
   System.out.println("Server sent " + chain.length + " certificate(s):");
   System.out.println();
   MessageDigest sha1 = MessageDigest.getInstance("SHA1");
   MessageDigest md5 = MessageDigest.getInstance("MD5");
   for (int i = 0; i < chain.length; i++) {
       X509Certificate cert = chain[i];
       System.out.println
          (" " + (i + 1) + " Subject " + cert.getSubjectDN());
       System.out.println("   Issuer " + cert.getIssuerDN());
       sha1.update(cert.getEncoded());
       System.out.println("   sha1   " + toHexString(sha1.digest()));
       md5.update(cert.getEncoded());
       System.out.println("   md5     " + toHexString(md5.digest()));
       System.out.println();
  }

   System.out.println("Enter certificate to add to trusted keystore or 'q' to quit: [1]");
   String line = reader.readLine().trim();
   int k;
   try {
       k = (line.length() == 0) ? 0 : Integer.parseInt(line) - 1;
  } catch (NumberFormatException e) {
       System.out.println("KeyStore not changed");
       return;
  }

   X509Certificate cert = chain[k];
   String alias = host + "-" + (k + 1);
   ks.setCertificateEntry(alias, cert);

   OutputStream out = new FileOutputStream("jssecacerts");
   ks.store(out, passphrase);
   out.close();

   System.out.println();
   System.out.println(cert);
   System.out.println();
   System.out.println
      ("Added certificate to keystore 'jssecacerts' using alias '"
       + alias + "'");
  }

   private static final char[] HEXDIGITS = "0123456789abcdef".toCharArray();

   private static String toHexString(byte[] bytes) {
   StringBuilder sb = new StringBuilder(bytes.length * 3);
   for (int b : bytes) {
       b &= 0xff;
       sb.append(HEXDIGITS[b >> 4]);
       sb.append(HEXDIGITS[b & 15]);
       sb.append(' ');
  }
   return sb.toString();
  }

   private static class SavingTrustManager implements X509TrustManager {

   private final X509TrustManager tm;
   private X509Certificate[] chain;

   SavingTrustManager(X509TrustManager tm) {
       this.tm = tm;
  }

   public X509Certificate[] getAcceptedIssuers() {
       throw new UnsupportedOperationException();
  }

   public void checkClientTrusted(X509Certificate[] chain, String authType)
       throws CertificateException {
       throw new UnsupportedOperationException();
  }

   public void checkServerTrusted(X509Certificate[] chain, String authType)
       throws CertificateException {
       this.chain = chain;
       tm.checkServerTrusted(chain, authType);
  }
  }
}

上传至目的目录,并编译:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/javac InstallCert.java

编译后生成下面的文件:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # ll
-rw-r--r-- 1 dmqs dmqs    975 May 28 02:23 InstallCert$SavingTrustManager.class
-rw-r--r-- 1 dmqs dmqs   6126 May 28 02:23 InstallCert.class
-rw-r--r-- 1 dmqs dmqs   6884 May 28 02:21 InstallCert.java

添加域名(kafka集群配置为域名方式)到jssecacerts
文件中:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node1:9093

这时在当前目录就生成了jssecacerts
文件。如果集群是多节点,需要将其他节点域名信息追加到这个文件中。执行命令即为:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node2:9093
f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # dmqs/cmak-3.0.0.0/jdk/bin/java InstallCert kafka.itdw.node3:9093

这样就生成了集群所有的节点域名的信任证书。

最后将jssecacerts
文件拷贝至jdk/lib/security

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0 # cp jssecacerts jdk/lib/security

完成所有配置的准备。

1.5 服务启动

完成配置文件准备后,使用下面的命令启动Kafka-manager
服务:

f-itdw-4c8g-100g-11:/dmqs/cmak-3.0.0.0/bin # ./cmak -java-home ../jdk -Djava.security.auth.login.config=../conf/kafka_server_jaas.conf -Dapplication.home=/dmqs/cmak-3.0.0.0 > dev/null 2>&1 &

其中参数命令说明如下:

  • 参数-java-home
    指定服务启动的java依赖环境目录;

  • 参数-Djava.security.auth.login.config
    指定和kafka和zookeeper交互的jaas文件路径;

  • 参数-Dapplication.home
    指定了应用的主目录;

  • 参数-Dhttp.port=8888
    指定了应用的监听端口,默认9000;

  • 参数-Dconfig.file=../conf/application.conf
    指定了应用的应用配置文件;

启动后应用目录下面生成logs
目录,作为日志存放目录。启动命令不指定端口的情况下,默认监听9000
端口。

1.6 自动化脚本

为了提高服务运维管理,对服务启停进行自动化管理。

#!/bin/bash -e

RETVAL=0
cmak="/dmqs/cmak-3.0.0.0/bin/cmak"
start() {
       $cmak -java-home ../jdk -Djava.security.auth.login.config=../conf/kafka_server_jaas.conf -Dapplication.home=/dmqs/cmak-3.0.0.0 >/dev/null 2>&1 &
       RETVAL=$?
      [ $RETVAL -eq 0 ] && echo "Start Kafka Manager Success!" ||echo "Start Kafka Manager failed!"
      return $RETVAL
}

stop() {
       CMAKPID=$(ps -ef|grep cmak|grep -v grep| awk '{print $2}')
       if [[ -a dmqs/cmak-3.0.0.0/RUNNING_PID ]]
       then
        rm dmqs/cmak-3.0.0.0/RUNNING_PID && echo -e "\n已删除文件:RUNNING_PID\n" && kill -9 $CMAKPID >/dev/null 2>&1 &
        RETVAL=$?
       else
         kill -9 $CMAKPID >/dev/null 2>&1 &
         RETVAL=$?
       fi;
      [ $? -eq 0 ] && echo "Stop Kafka Manager Success!" ||echo "Stop Kafka Manager failed!"
      return $RETVAL
}
case "$1" in
 start)
       start
  ;;
 stop)
       stop
      ;;
 restart)
     
      sh $0 stop
      sh $0 start
      ;;
  *)
       echo "Format error!"
       echo $"Usage: $0 {start|stop|restart}"
       exit 1
      ;;
esac
exit $RETVAL

对于启动命令,可以自定义修改。

第二部分 kafka-manager配置

2.1 创建新集群管理

创建新的管理集群,需要填入下面的信息:

  • Cluster Name

    集群名称;

  • Cluster Zookeeper Hosts

    配置kafka集群背后的zookeeper集群的信息。例如:192.168.1.1:2181;

  • Kafka Version

    Kafka的版本信息;

  • Enable JMX Polling (Set JMX_PORT env variable before starting kafka server)

    是否启用集群的监控组件。

  • Security Protocol

    安全协议。目前支持:SSL、SASL_PLAINTEXT、SASL_SSL、PLAINTEXT

  • SASL Mechanism (only applies to SASL based security)

    SASL的权限管理协议:DEFAULT、PLAIN、GSSAPI、SCRAM-SHA-256、SCRAM-SHA-512

  • SASL JAAS Config (only applies to SASL based security)

    SASL的用户配置信息。例如:

    org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";

    需要注意的是配置以分号结束,否则会报错。

第三部分 kafka-manager管理

Kafka Manager服务启动后,默认监听9000端口,所以服务URL地址为:http://102.168.1.1:9000
。目前组件支持的管理功能有:

  • 管理多个集群

  • 轻松检查集群状态(主题,使用者,偏移量,代理,副本分发,分区分发)

  • 运行首选副本选择

  • 生成带有选项的分区分配,以选择要使用的代理

  • 运行分区的重新分配(基于生成的分配)

  • 使用可选的主题配置创建主题(0.8.1.1与0.8.2+具有不同的配置)

  • 删除主题(仅在0.8.2+上受支持,并记住在代理配置中设置delete.topic.enable = true)

  • 现在,主题列表指示标记为删除的主题(仅在0.8.2+上受支持)

  • 批量生成多个主题的分区分配,并可以选择要使用的代理

  • 批量运行分区的多个主题的重新分配

  • 将分区添加到现有主题

  • 更新现有主题的配置

  • (可选)为代理级别和主题级别的度量启用JMX轮询。

  • (可选)过滤出在Zookeeper中没有id / owner /&offsets /目录的使用者。

对于具体的组件使用,可以参文献中的[3]。

参考文献及资料

1、kafka-manager项目地址,链接:https://github.com/yahoo/kafka-manager

2、kafka-manager项目下载地址,链接:https://blog.wolfogre.com/posts/kafka-manager-download/

3、Apache Kafka集群管理工具CMAK(Cluster Manager for Apache Kafka)从安装启动到配置使用,链接:http://www.luyixian.cn/news_show_324464.aspx

文章转载自数据科学和工程,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论