暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink实战:连接开启Kerberos认证的Kafka集群

大数据开发运维架构 2020-02-29
873

      当kafka开启Kerberos认证后,如何使用Flink生产或消费数据呢?其实就是在生产消费者的代码中加入jaas.conf、keytab这些认证有关的配置,下面我们直接看代码:

版本信息:

    flink1.9.0

    kafka0.10.0

    这里提示一下,如果版本依赖的不一致会报错,一定要对应版本:

    java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread

    1.其实连接Kerberos集群很简单,需要下面三个文件:

        1).KerberosServer的配置文件krb5.conf,让程序知道我应该哪个kdc去登录认证;

      [libdefaults]
      udp_preference_limit = 1
      renew_lifetime = 3650d
      forwardable = true
      default_realm = CHINAUNICOM
      ticket_lifetime = 3650d
      dns_lookup_realm = false
      dns_lookup_kdc = false
      default_ccache_name = /tmp/krb5cc_%{uid}
      #default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5
        #default_tkt_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5
      [domain_realm]
        .CHINAUNICOM = CHINAUNICOM
      [logging]
      default = FILE:/var/log/krb5kdc.log
      admin_server = FILE:/var/log/kadmind.log
        kdc = FILE:/var/log/krb5kdc.log
      [realms]
      CHINAUNICOM = {
          admin_server = master98.hadoop.ljs
      kdc = master98.hadoop.ljs
        }

          2).认证肯定需要指定认证方式这里需要一个jaas.conf文件,一般集群的conf目录下都有;

        KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        keyTab="D:\\kafkaSSL\\kafka.service.keytab"
        storeKey=true
        useTicketCache=false
        principal="kafka/salver32.hadoop.unicom@CHINAUNICOM"
        serviceName=kafka;
        };

            3).就是用户的登录认证票据和认证文件,票据和keytab文件这里就不在贴了;

        2.为防止你依赖报错,这里贴下pom.xml依赖,可能有些冗余,自己删除即可:

          <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>${kafka.version}</version>
          <scope>compile</scope>
          </dependency>
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-hadoop-fs</artifactId>
          <version>${flink.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>${httpclient.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
          <version>1.9.0</version>
          <scope>compile</scope>
          </dependency>

          4.Flink接收socket端消息,发送到kafka:

          5.Flink将socket接收的数据发送Kafka,代码实例:

            package com.hadoop.ljs.flink.streaming;

            import com.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;
            import org.apache.flink.streaming.api.datastream.DataStream;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
            import org.apache.kafka.clients.producer.ProducerConfig;

            import java.util.Properties;

            /**
            * @author: Created By lujisen
            * @company ChinaUnicom Software JiNan
            * @date: 2020-02-29 09:31
            * @version: v1.0
            * @description: com.hadoop.ljs.flink.streaming
            */
            public class FlinkKafkaKerberosProducer {
            public static final String topic="topic1";
            public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";
            public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";
            public static final String bootstrapServers="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";
            public static final String hostname="localhost";
                public static final int port=9000;
                public static void main(String[] args) throws Exception {
            //在windows中设置JAAS,也可以通过-D方式传入
            System.setProperty("java.security.krb5.conf", krb5Conf);
                    System.setProperty("java.security.auth.login.config", kafkaJaasConf);
            /*获取flink流式计算执行环境*/
                    final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
            /*从Socket端接收数据*/
            DataStream<String> dataSource = senv.socketTextStream(hostname, port, "\n");
                    /*下面可以根据自己的需求进行自动的转换*/
            /*接收的数据,中间可经过复杂的处理,最后发送到kafka端*/
            dataSource.addSink(new FlinkKafkaProducer010<String>(topic, new CustomKeyedSerializationSchema(), getProducerProperties()));
            /*启动*/
            senv.execute("FlinkKafkaProducer");
                }
            public static Properties getProducerProperties(){
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrapServers);
            props.put("acks", "1");
            props.put("retries", 3);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.kerberos.service.name", "kafka");
            props.put("sasl.mechanism", "GSSAPI");
            return props;
            }
            }

            6.Flink连接kafka消费消息,代码实例:

              package com.hadoop.ljs.flink.streaming;
              import com.hadoop.ljs.flink.utils.KafkaCommonRecordSchema;
              import org.apache.flink.api.common.functions.MapFunction;
              import org.apache.flink.api.common.serialization.SimpleStringSchema;
              import org.apache.flink.streaming.api.datastream.DataStream;
              import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
              import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
              import org.apache.kafka.clients.consumer.ConsumerRecord;
              import java.util.HashMap;
              import java.util.Map;
              import java.util.Properties;
              /**
              * @author: Created By lujisen
              * @company ChinaUnicom Software JiNan
              * @date: 2020-02-29 09:31
              * @version: v1.0
              * @description: com.hadoop.ljs.flink.streaming
              */
              public class FlinkKafkaKerberosConsumer {
              public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";
              public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";
              public static final String topic="topic1";
              public static final String consumerGroup="test_topic1";
                  public static final String bootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";
                  public static void main(String[] args) throws Exception {
              //在windows中设置JAAS,也可以通过-D方式传入
              System.setProperty("java.security.krb5.conf", krb5Conf);
              System.setProperty("java.security.auth.login.config", kafkaJaasConf);

                      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                      env.setParallelism(1);
              FlinkKafkaConsumer010<String> consumer010 = new FlinkKafkaConsumer010<String>(topic,new SimpleStringSchema(), getComsumerProperties());
                      consumer010.setStartFromEarliest();  
              //source从kafka
                      DataStream<String> dataStream = env.addSource(consumer010);
              dataStream.print();
              try {
              env.execute();
              } catch (Exception ex) {
              ex.printStackTrace();
              }
              }
              private static Properties getComsumerProperties() {
              Properties props = new Properties();
              props.put("bootstrap.servers",bootstrapServer);
              props.put("group.id",consumerGroup);
              props.put("auto.offset.reset", "earliest");
              props.put("security.protocol", "SASL_PLAINTEXT");
              props.put("sasl.kerberos.service.name", "kafka");
              props.put("sasl.mechanism", "GSSAPI");
              return props;
              }
              }

              文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论