暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink withIdleness 空闲检测的作用和注意事项

OfNull 2021-06-26
3269

背景

水印简单回顾

水印是Flink支持事件时间重要的组件,在与窗口使用中,是根据业务时间进行计算判断重要条件依据。

我们知道在网络中存在网络延迟,客户端发送数据,服务端接收的结果可能顺序不一致!如:客户端发送了1,2   服务端收到的可能是2,1。

服务端不能知道客户端什么时候发送完所有小于2的数据,所以需要有一套机制来判断客户端发送的进度。

这里就得提到水印(Watermark) ,服务端通过对业务时间判断增长阈值,来判断数据具体发送到哪一个时间段了,通过这个水印判断认为不再会有比它更小的数据进来。

多并行度的水印

常见在kafka中,一个topic有多个分区,flink一个并行度消费一个分区,那水位线是怎么推进的呢?flink中会取分区中最小的水印当做系统中的水印。

那么新的问题来了,如果kafka中数据倾斜或者某个分区就没有数据,那么某个分区水印就一直不更新,造成系统水印也就推进不了?这个怎么办呢?

flink其实也想到这个问题,提供了一个withIdleness

举个例子

以下代码版本都是flink1.11

实际withIdleness如何使用和注意事项可以看如下栗子🌰。

建立 topic

建立idleness_topic,设置为两个分区。

topicpartition
idleness_topic0
idleness_topic1

编写发送代码

public class IdlenessKafkaProducer {

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.130.11.208:9092,10.130.10.243:9092,10.130.11.207:9092");
        prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        prop.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        prop.put(ProducerConfig.RETRIES_CONFIG, 0);
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);                 //默认的批量处理消息字节数
        prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);                     //延迟等待发送时间
        prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024 * 1024 * 50);  //producer可以用来缓存数据的内存大小。

  //发送客户端
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(prop);
  
        //控制台输入
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入: 姓名,业务时间,分区");
        while (scanner.hasNext()) {
            String rs = scanner.nextLine();
            if (StringUtils.isBlank(rs)) {
                continue;
            }
            if (rs.equalsIgnoreCase("exit")) {
                kafkaProducer.close();
                return;
            }
            String[] split = rs.split(",");
            Map<String, Object> map = new HashMap<>(2);
            map.put("name", split[0]);
            map.put("ts", Long.valueOf(split[1]));
            Integer partition = Integer.valueOf(split[2]);
            
   //实际发送数据
            ProducerRecord record = new ProducerRecord("idleness_topic", partition, null, JSON.toJSONString(map));
            Future send = kafkaProducer.send(record, (RecordMetadata data, Exception e) -> {
                System.out.println("数据发送成功!" + data.topic() + ":" + data.partition() + ":" + data.offset());
            });
            System.out.println("请输入下一条:");
        }

    }
}

简单说下代码:就是在控制台按照格式手动输入  姓名,业务时间,主题分区 如:

  1. 控制台输入:王思聪,1000,0
  2. 代码会解析字符串
  3. 将王思聪,1000封装到Map
  4. 将Map对象的json串发送到idleness_topic主题0分区


Flink 处理实时数据

我们简单定义下业务处理数据的规则 :需求就是将收到的数据信息按照用户名分组,然后每隔5秒统计一次各个用户组的总条数!代码如下:

public class IdlenessJob {
    public static void main(String[] args) throws Exception {
        //自己封装的带环境区分的参数工具类 
        ParameterTool parameterTool = ParameterToolEnvironmentUtils.createParameterTool(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置为业务时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  

        //设置kafka消费基本信息 
        String bootstrapServers = parameterTool.get(KafkaSinkUtil.KAFKA_BOOTSTRAP_SERVERS);
        String groupId = "idleness-group";
        Properties kafkaProp = new Properties();
        kafkaProp.setProperty(KafkaConstant.BOOTSTRAP_SERVERS_KEY, bootstrapServers);
        kafkaProp.setProperty(KafkaConstant.GROUP_ID_KEY, groupId);
        
        FlinkKafkaConsumer<Buy> kafkaConsumer = new FlinkKafkaConsumer("idleness_topic"new KafkaDeserializationSchema<Buy>() {
            @Override
            public boolean isEndOfStream(Buy nextElement) {
                return false;
            }

            @Override
            public Buy deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                //将接收到的数据对象 解析到 Buy 封装的对象中
                String json = new String(record.value(), "UTF-8"); 
                return JSONObject.parseObject(json, Buy.class);
            }

            @Override
            public TypeInformation getProducedType() {
                return TypeInformation.of(Buy.class);
            }
        }, kafkaProp);
        
        kafkaConsumer.setStartFromLatest(); //每次启动从卡夫卡最新数据拉取
        
        //抽取EventTime生成Watermark  可以处理迟到2秒数据
        FlinkKafkaConsumerBase<Buy> source = kafkaConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.<Buy>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner((e, t) -> e.ts));

        DataStreamSource<Buy> dataStreamSource = env.addSource(source);

        //通过name分组后 5秒滚动一个窗口 
        SingleOutputStreamOperator<Tuple4<String, Long, Long, Long>> process = dataStreamSource.keyBy(e -> e.name).window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
                .process(new ProcessWindowFunction<Buy, Tuple4<String, Long, Long, Long>, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<Buy> elements, Collector<Tuple4<String, Long, Long, Long>> out) throws Exception {
                        long start = context.window().getStart();  //窗口开始时间 
                        long end = context.window().getEnd();      //窗口结束时间 
                        Long total = 0L;                           //总条数
                        String name = "";                          //用户名
                        for (Iterator<Buy> iterator = elements.iterator(); iterator.hasNext(); ) {
                            Buy next = iterator.next();
                            name = next.name;
                            total += 1;
                        }
                        out.collect(Tuple4.of(name, total, start, end));
                    }
                });

        //控制台输出
        process.print();

        env.execute("Test IdlenessJob");
    }

    static class Buy implements Serializable {
        private String name;  //用户名
        private Long ts;      //业务时间

     //get set 省略
    }
}


数据准备

从代码可以知道窗口时间范围5秒。提示:1000ms(毫秒)等于1s(秒) 所以5秒开窗有以下窗口。

窗口开始时间结束时间
105000
2500010000
31000015000
41500020000
n......


假设发送数据如下:

topic分区01
思聪500,1000,900,7000
一宁
7000

情况一:窗口正常输出

往分区0发送数据

  • 思聪,500,0
  • 思聪,1000,0
  • 思聪,900,0
  • 思聪,7000,0


往分区1发送数据

  • 一宁,7000,1


结果:7000-2000 > 5000 -1 会输出窗口计算结果!
因为有两秒迟到数据容错 所以水位线必须推进到 7秒才会触发窗口!这个时候会统计到这三条数据:

  • 思聪,500,0
  • 思聪,1000,0
  • 思聪,900,0

这三条数据在 0-5000 窗口打印

情况二:不触发窗口?

首先重启任务(程序)

开始往分区0发送数据

  • 思聪,500,0
  • 思聪,1000,0
  • 思聪,900,0
  • 思聪,7000,0

这里我不再分区1发送数据,结果如下:

这就是多并行度情况下,分区水印推进只会取最小的水印,现在分区一没有数据,所以水印不能达到5秒 这里我们可以加一条分区数据但是时间还是小于5秒

  • 一宁,4000,1

可以发现它也不会触发,因为分区1的水印现在推进到了4s也没到达7s ,如果之后一直没数据推进,那这个窗口就永远不会触发了。

解决方案:withIdleness

首先修改下代码,添加空闲分区检测  .withIdleness(Duration.ofSeconds(60))

    //抽取EventTime生成Watermark
        FlinkKafkaConsumerBase<Buy> source = kafkaConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.<Buy>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner((e, t) -> e.ts)
                .withIdleness(Duration.ofSeconds(60))); //添加空闲分区检测 60s


往分区0发送数据

  • 思聪,500,0
  • 思聪,1000,0
  • 思聪,900,0
  • 思聪,7000,0

这里我也不再分区1发送数据,等待60秒后,可以看到窗口还是触发了。

如果此时我再往分区1里面发送数据小于5秒  0-5000 窗口内的数据会有什么影响?其实没有影响 因为这条数据已经过期了。当然这里可以配置侧输出处理这条过期数据。

  • 一宁,4000,1
image.png

注意:比如所有分区,全部都不增长水印,那么配置了空闲检测也是无效的!


文章转载自OfNull,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论