本文解决的问题:多并行度下watermark是怎么更新的?在第一次获取watermark的时候,为什么是一个负数?在本文你会找到想要的答案
1.watermark多并行度分析
(1)获取第1条记录
初始状态watermark默认值为-9223372036854775808
设置map为2个并行度,进行轮询的方式执行并行度
第1条记录处理sensor_1,2000,3.0
watermark值为:-9223372036854775808
更新左边
-9223372036854775808和-9223372036854775808中取最小所以-9223372036854775808,并且更新-9223372036854775808为2000
见下图
(2)获取第2条记录
sensor_2,1000,2.0
watermark值为-9223372036854775808,没有改变
更新右边watermark:1000
2000和-9223372036854775808中取最小所以-9223372036854775808,并且更新-9223372036854775808为1000
(3)获取第3条记录
左边执行
sensor_2,5000,2.0
2000和1000中取最小所以1000,并且更新2000为5000
watermark值为:1000
1000-1=999
(4)获取第4条记录
右边执行
sensor_2,6000,2.0
watermark为:1000
1000-1=999
5000和1000中取最小所以1000,并且更新1000为6000
2.初始化水印值
通过源码看到初始化水印值就是:-9223372036854775808
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();// start so that our lowest watermark would be Long.MIN_VALUE.this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;}
public final class Long extends Number implements Comparable<Long> {/*** A constant holding the minimum value a {@code long} can* have, -2<sup>63</sup>.*/@Native public static final long MIN_VALUE = 0x8000000000000000L;

3.watermark多并行度demo
package run.been.flinkdemo.watermarkimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}import org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}import org.apache.flink.util.Collectorimport run.been.flinkdemo.util.SensorReadingimport java.time.Durationobject WaterMarkParallelismDemo {def test02(env: StreamExecutionEnvironment) = {/*** 定义水印生成策略* 这里实现的水印生成策略的时候,实现接口SerializableTimestampAssigner,目的是从输入的数据中抽取时间字段,* 时间类型要是Long型的* 其实就是一个接口的实现*/val strategy = WatermarkStrategy.forBoundedOutOfOrderness[SensorReading](Duration.ofMillis(0)) //延迟0秒.withTimestampAssigner(new SerializableTimestampAssigner[SensorReading] {override def extractTimestamp(t: SensorReading, l: Long): Long = t.timestamp //指定事件时间字段})val inputStream = env.socketTextStream("localhost", 9999).map { text =>val arr: Array[String] = text.split(",")SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)}.assignTimestampsAndWatermarks(strategy).setParallelism(2)//这里设置为2个并行度println("inputStream parallelism is " + inputStream.parallelism)inputStream.print()//为了查看watermark的生成,下面代码获取对应的时间val sink = inputStream.process(new ProcessFunction[SensorReading,SensorReading] {override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {//数据时间戳println("本次时间收到的时间:"+value.timestamp)println("本次数据"+ value)//获取处理时间val processTime = ctx.timerService().currentProcessingTime()println("此刻处理时间 = " + processTime)//获取水印val watermarkTime = ctx.timerService().currentWatermark()println("此刻水印时间 = " + watermarkTime)}//这里有1个并行度}).setParallelism(1)println("sink parallelism is " + sink.parallelism)sink.print()}def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)println("Env Parallelism is "+env.getParallelism)test02(env)env.execute("window job")}}/*** 输入数据* sensor_1,2000,3.0sensor_2,1000,2.0sensor_2,5000,2.0sensor_2,6000,2.0sensor_2,16000,2.0sensor_1,26000,2.0sensor_1,36000,2.0*//**** Env Parallelism is 1inputStream parallelism is 2sink parallelism is 1本次时间收到的时间:2000本次数据SensorReading(sensor_1,2000,3.0)SensorReading(sensor_1,2000,3.0)此刻处理时间 = 1658923852116此刻水印时间 = -9223372036854775808本次时间收到的时间:1000SensorReading(sensor_2,1000,2.0)本次数据SensorReading(sensor_2,1000,2.0)此刻处理时间 = 1658924211426此刻水印时间 = -9223372036854775808通过这里发现初始水印会有个负的时间,并行度是2的时候,会有2条记录进行初始化并行度是2所以获取的中最小值,所以还是负数本次时间收到的时间:5000SensorReading(sensor_2,5000,2.0)本次数据SensorReading(sensor_2,5000,2.0)此刻处理时间 = 1658924226955此刻水印时间 = 999第三条记录输入会有水印为1000-1=9991000和2000中最小值所以取1000本次时间收到的时间:6000SensorReading(sensor_2,6000,2.0)本次数据SensorReading(sensor_2,6000,2.0)此刻处理时间 = 1658924254119此刻水印时间 = 999第4条记录水印是9991000和5000中最小的 1000本次时间收到的时间:16000SensorReading(sensor_2,16000,2.0)本次数据SensorReading(sensor_2,16000,2.0)此刻处理时间 = 1658924267754此刻水印时间 = 4999第5条记录水印是5000-1前一次的最小值是5000和6000中最小的5000本次时间收到的时间:26000SensorReading(sensor_1,26000,2.0)本次数据SensorReading(sensor_1,26000,2.0)此刻处理时间 = 1658924292727此刻水印时间 = 5999第6条记录6000-1=5999本次时间收到的时间:36000SensorReading(sensor_1,36000,2.0)本次数据SensorReading(sensor_1,36000,2.0)此刻处理时间 = 1658924328410此刻水印时间 = 15999第6条记录6000-1=15999*/
3.总结
(1)wartermark的更新过程获取并行度中最小的watermark
(2)wartermark的初始值通过源码找到-9223372036854775808
4.所有代码地址
https://github.com/johncodeit/flinkdemo.git
奇迹的出现往往就在再坚持一下的时候!
感谢阅读。
期待点赞、分享、关注!
Rome was not built in a day.罗马并非一日建成的。坚持必成。




