当kafka开启Kerberos认证后,如何使用Flink生产或消费数据呢?其实就是在生产消费者的代码中加入jaas.conf、keytab这些认证有关的配置,下面我们直接看代码:
版本信息:
flink1.9.0
kafka0.10.0
这里提示一下,如果版本依赖的不一致会报错,一定要对应版本:
java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread
1.其实连接Kerberos集群很简单,需要下面三个文件:
1).KerberosServer的配置文件krb5.conf,让程序知道我应该哪个kdc去登录认证;
[libdefaults]udp_preference_limit = 1renew_lifetime = 3650dforwardable = truedefault_realm = CHINAUNICOMticket_lifetime = 3650ddns_lookup_realm = falsedns_lookup_kdc = falsedefault_ccache_name = /tmp/krb5cc_%{uid}#default_tgs_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5#default_tkt_enctypes = aes des3-cbc-sha1 rc4 des-cbc-md5[domain_realm].CHINAUNICOM = CHINAUNICOM[logging]default = FILE:/var/log/krb5kdc.logadmin_server = FILE:/var/log/kadmind.logkdc = FILE:/var/log/krb5kdc.log[realms]CHINAUNICOM = {admin_server = master98.hadoop.ljskdc = master98.hadoop.ljs}
2).认证肯定需要指定认证方式这里需要一个jaas.conf文件,一般集群的conf目录下都有;
KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="D:\\kafkaSSL\\kafka.service.keytab"storeKey=trueuseTicketCache=falseprincipal="kafka/salver32.hadoop.unicom@CHINAUNICOM"serviceName=kafka;};
3).就是用户的登录认证票据和认证文件,票据和keytab文件这里就不在贴了;
2.为防止你依赖报错,这里贴下pom.xml依赖,可能有些冗余,自己删除即可:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-fs</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>${httpclient.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.9.0</version><scope>compile</scope></dependency>
4.Flink接收socket端消息,发送到kafka:

5.Flink将socket接收的数据发送Kafka,代码实例:
package com.hadoop.ljs.flink.streaming;import com.hadoop.ljs.flink.utils.CustomKeyedSerializationSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-02-29 09:31* @version: v1.0* @description: com.hadoop.ljs.flink.streaming*/public class FlinkKafkaKerberosProducer {public static final String topic="topic1";public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";public static final String bootstrapServers="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";public static final String hostname="localhost";public static final int port=9000;public static void main(String[] args) throws Exception {//在windows中设置JAAS,也可以通过-D方式传入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config", kafkaJaasConf);/*获取flink流式计算执行环境*/final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();/*从Socket端接收数据*/DataStream<String> dataSource = senv.socketTextStream(hostname, port, "\n");/*下面可以根据自己的需求进行自动的转换*//*接收的数据,中间可经过复杂的处理,最后发送到kafka端*/dataSource.addSink(new FlinkKafkaProducer010<String>(topic, new CustomKeyedSerializationSchema(), getProducerProperties()));/*启动*/senv.execute("FlinkKafkaProducer");}public static Properties getProducerProperties(){Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("acks", "1");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.kerberos.service.name", "kafka");props.put("sasl.mechanism", "GSSAPI");return props;}}
6.Flink连接kafka消费消息,代码实例:
package com.hadoop.ljs.flink.streaming;import com.hadoop.ljs.flink.utils.KafkaCommonRecordSchema;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;import java.util.Map;import java.util.Properties;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-02-29 09:31* @version: v1.0* @description: com.hadoop.ljs.flink.streaming*/public class FlinkKafkaKerberosConsumer {public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";public static final String topic="topic1";public static final String consumerGroup="test_topic1";public static final String bootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";public static void main(String[] args) throws Exception {//在windows中设置JAAS,也可以通过-D方式传入System.setProperty("java.security.krb5.conf", krb5Conf);System.setProperty("java.security.auth.login.config", kafkaJaasConf);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FlinkKafkaConsumer010<String> consumer010 = new FlinkKafkaConsumer010<String>(topic,new SimpleStringSchema(), getComsumerProperties());consumer010.setStartFromEarliest();//source从kafkaDataStream<String> dataStream = env.addSource(consumer010);dataStream.print();try {env.execute();} catch (Exception ex) {ex.printStackTrace();}}private static Properties getComsumerProperties() {Properties props = new Properties();props.put("bootstrap.servers",bootstrapServer);props.put("group.id",consumerGroup);props.put("auto.offset.reset", "earliest");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.kerberos.service.name", "kafka");props.put("sasl.mechanism", "GSSAPI");return props;}}
文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




