暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink-多并行度下Watermark(26)

beenrun 2022-07-28
974

本文解决的问题:多并行度下watermark是怎么更新的?在第一次获取watermark的时候,为什么是一个负数?在本文你会找到想要的答案


1.watermark多并行度分析

(1)获取第1条记录
初始状态watermark默认值为-9223372036854775808
设置map为2个并行度,进行轮询的方式执行并行度
第1条记录处理sensor_1,2000,3.0
watermark值为:-9223372036854775808
更新左边
-9223372036854775808和-9223372036854775808中取最小所以-9223372036854775808,并且更新-9223372036854775808为2000
见下图

(2)获取第2条记录
sensor_2,1000,2.0
watermark值为-9223372036854775808,没有改变
更新右边watermark:1000
2000和-9223372036854775808中取最小所以-9223372036854775808,并且更新-9223372036854775808为1000

(3)获取第3条记录
左边执行
sensor_2,5000,2.0
2000和1000中取最小所以1000,并且更新2000为5000
watermark值为:1000
1000-1=999

(4)获取第4条记录
右边执行
sensor_2,6000,2.0
watermark为:1000
1000-1=999
5000和1000中取最小所以1000,并且更新1000为6000


2.初始化水印值

通过源码看到初始化水印值就是:-9223372036854775808

    
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
    checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
    checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");


    this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();


    // start so that our lowest watermark would be Long.MIN_VALUE.
    this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
      public final class Long extends Number implements Comparable<Long> {
      /**
      * A constant holding the minimum value a {@code long} can
      * have, -2<sup>63</sup>.
      */
      @Native public static final long MIN_VALUE = 0x8000000000000000L;


      3.watermark多并行度demo

        package run.been.flinkdemo.watermark


        import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
        import org.apache.flink.streaming.api.functions.ProcessFunction
        import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
        import org.apache.flink.util.Collector
        import run.been.flinkdemo.util.SensorReading


        import java.time.Duration


        object WaterMarkParallelismDemo {


        def test02(env: StreamExecutionEnvironment) = {
        /**
        * 定义水印生成策略
        * 这里实现的水印生成策略的时候,实现接口SerializableTimestampAssigner,目的是从输入的数据中抽取时间字段,
        * 时间类型要是Long型的
        * 其实就是一个接口的实现
        */
        val strategy = WatermarkStrategy.forBoundedOutOfOrderness[SensorReading](Duration.ofMillis(0)) //延迟0秒
        .withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {
        override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp //指定事件时间字段
        })




        val inputStream = env.socketTextStream("localhost", 9999)
        .map { text =>
        val arr: Array[String] = text.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
        }.assignTimestampsAndWatermarks(strategy).setParallelism(2)
        //这里设置为2个并行度
        println("inputStream parallelism is " + inputStream.parallelism)


        inputStream.print()
        //为了查看watermark的生成,下面代码获取对应的时间
        val sink = inputStream.process(new ProcessFunction[SensorReading,SensorReading] {
        override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
        //数据时间戳
        println("本次时间收到的时间:"+value.timestamp)
        println("本次数据"+ value)
        //获取处理时间
        val processTime = ctx.timerService().currentProcessingTime()
        println("此刻处理时间 = " + processTime)
        //获取水印
        val watermarkTime = ctx.timerService().currentWatermark()
        println("此刻水印时间 = " + watermarkTime)
        }
        //这里有1个并行度
        }).setParallelism(1)
        println("sink parallelism is " + sink.parallelism)
        sink.print()


        }


        def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        println("Env Parallelism is "+env.getParallelism)
        test02(env)
        env.execute("window job")
        }
        }




        /**
        * 输入数据
        * sensor_1,2000,3.0
        sensor_2,1000,2.0
        sensor_2,5000,2.0
        sensor_2,6000,2.0
        sensor_2,16000,2.0
        sensor_1,26000,2.0
        sensor_1,36000,2.0
        */


        /**
        *
        * Env Parallelism is 1
        inputStream parallelism is 2
        sink parallelism is 1


        本次时间收到的时间:2000
        本次数据SensorReading(sensor_1,2000,3.0)
        SensorReading(sensor_1,2000,3.0)
        此刻处理时间 = 1658923852116
        此刻水印时间 = -9223372036854775808


        本次时间收到的时间:1000
        SensorReading(sensor_2,1000,2.0)
        本次数据SensorReading(sensor_2,1000,2.0)
        此刻处理时间 = 1658924211426
        此刻水印时间 = -9223372036854775808
        通过这里发现初始水印会有个负的时间,并行度是2的时候,会有2条记录进行初始化
        并行度是2所以获取的中最小值,所以还是负数


        本次时间收到的时间:5000
        SensorReading(sensor_2,5000,2.0)
        本次数据SensorReading(sensor_2,5000,2.0)
        此刻处理时间 = 1658924226955
        此刻水印时间 = 999
        第三条记录输入会有水印为1000-1=999
        1000和2000中最小值所以取1000


        本次时间收到的时间:6000
        SensorReading(sensor_2,6000,2.0)
        本次数据SensorReading(sensor_2,6000,2.0)
        此刻处理时间 = 1658924254119
        此刻水印时间 = 999
        第4条记录水印是999
        1000和5000中最小的 1000
        本次时间收到的时间:16000
        SensorReading(sensor_2,16000,2.0)
        本次数据SensorReading(sensor_2,16000,2.0)
        此刻处理时间 = 1658924267754
        此刻水印时间 = 4999
        第5条记录水印是5000-1
        前一次的最小值是5000和6000中最小的5000


        本次时间收到的时间:26000
        SensorReading(sensor_1,26000,2.0)
        本次数据SensorReading(sensor_1,26000,2.0)
        此刻处理时间 = 1658924292727
        此刻水印时间 = 5999
        第6条记录6000-1=5999


        本次时间收到的时间:36000
        SensorReading(sensor_1,36000,2.0)
        本次数据SensorReading(sensor_1,36000,2.0)
        此刻处理时间 = 1658924328410
        此刻水印时间 = 15999
        第6条记录6000-1=15999
        */


        3.总结

        (1)wartermark的更新过程获取并行度中最小的watermark
        (2)wartermark的初始值通过源码找到-9223372036854775808


        4.所有代码地址

        https://github.com/johncodeit/flinkdemo.git

        奇迹的出现往往就在再坚持一下的时候!

        感谢阅读。

        期待点赞、分享、关注!

        Rome was not built in a day.罗马并非一日建成的。坚持必成。


        文章转载自beenrun,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论