By大数据研习社
概要:1.在Flink SQL流式计算中,Kafka SQL Connector是企业中的标配。
2.Flink SQL在Kafka流式计算中应用会越来越广泛。
1 Flink Table与Kafka集成案例
1.1 需求
需求:Flink Table从kafka消费点击日志(JSON),转化为CSV格式之后输出到Kafka。1.2 添加Maven依赖
FlinkTable集成Kafka需引入如下依赖: <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version>1.3 代码实现
Flink Table API实现Kafka生产者与消费者的完整代码如下所示。package com.bigdata.chap02;import org.apache.flink.table.api.*;import static org.apache.flink.table.api.Expressions.$;public class FlinkTableAPIKafka2Kafka { public static final String input_topic = "clicklog_input"; public static final String out_topic = "clicklog_output"; public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings TableEnvironment tEnv = TableEnvironment.create(settings); final Schema schema = Schema.newBuilder() .column("user", DataTypes.STRING()) .column("url", DataTypes.STRING()) .column("cTime", DataTypes.STRING()) tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") .option("topic",input_topic) .option("properties.bootstrap.servers","hadoop1:9092") .option("properties.group.id","clicklog")//每次都从最早的offsets开始 .option("scan.startup.mode","latest-offset") tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("kafka") .option("topic",out_topic) .option("properties.bootstrap.servers","hadoop1:9092") //5、输出(包括执行,不需要单独在调用tEnv.execute("job")) .select($("user"), $("url"),$("cTime")) .executeInsert("sinkTable");1.4 打开Kafka数据生产与消费
bin/kafka-topics.sh --zookeeper localhost:2181 --listbin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_input --replication-factor 3 --partitions 3bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic clicklog_output --replication-factor 3 --partitions 3bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicklog_outputbin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}如果clicklog_input topic的生产者输入数据之后,在clicklog_output topic端能消费到数据,则说明Flink Table打通了Kafka端到端的数据流程。2 Kafka SQL Connector高级特性
2.1 key和value格式
Kafka消息的key和value均可指定format。tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") .option("topic",input_topic) .option("properties.bootstrap.servers","hadoop1:9092") .option("properties.group.id","clicklog")//每次都从最早的offsets开始 .option("scan.startup.mode","latest-offset")tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("kafka") .option("key.format","json") .option("value.format","json") .option("topic",input_topic) .option("properties.bootstrap.servers","hadoop1:9092") .option("properties.group.id","clicklog")//每次都从最早的offsets开始 .option("scan.startup.mode","latest-offset")注意:format("json")和option("value.format",2.2 Topic和Partition发现
可以通过topic或者topic-pattern来配置主题。
注意:要允许在作业开始运行后发现动态创建的topic,请为 scan.topic-partition-discovery.interval 设置一个非负值。2.3 读取位置
作为source,是可以通过scan.startup.mode选项指定从哪个位置开始消费,可选的值如下。2.4 Sink分区
当kafka作为sink时,可以通过sink.partitioner指定partitioner。支持的选项值如下。2.5 致性保证
默认情况下,如果启用checkpoint,Kafka sink使用at-least-once一致性语意。在启用checkpoint的前题下,可通过sink.delivery-guarantee来调整一致性语意:一旦启用了事物来保证exactly-once语意,一定要注意下游消费者要配置isolation.level为read_committed(默认是read_uncommitted)。
长按识别左侧二维码
关注领福利
领10本经典大数据书