暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink源码分析-生成水印(Watermark)

IT那活儿 2021-01-27
2086
点击上方蓝字关注我们

背景

之前项目一直用的Flink-1.72版本,大多数用的流api进行开发的需求,现在扫描漏洞的时候必须升级到Flink-1.12.0或Flink-1.11.3,所以直接升级到Flink-1.12.0,发现之前用的api(assignTimestampsAndWatermarks)被设置为废弃了。

先来看看项目之前用的:

后来查资料发现Flink在1.11版本中为了实现水印的通用以及方便,对水印进行了重构。

新的水印生成接口

新版本的Flink在类classDataStream<T>中提供了一个新的构造水印assignTimestampsAndWatermarks方法,新的接口需要传入一个WatermarkStrategy对象。

WatermarkStrategy接口继承了接口TimestampAssignerSupplier<T>以及接口WatermarkGeneratorSupplier<T>,上面两个接口都是支持函数式编程的。

先看一下interfaceTimestampAssignerSupplier<T>这个接口提供的方法。

是创建一个TimestampAssigner<T>类型的方法。那这个TimestampAssigner<T>的在水印生成过程中起到什么作用了。先看下这个接口的定义

有一个longextractTimestamp方法,作用是从Flink消费的记录中抽取时间,既可以理解为我们如果要通过业务时间进行统计时,需要通过该方法对来提取记录的业务时间。所以用到业务时间的话,一定要根据自己的业务场景对该方法进行具体的实现。否则Flink会提供一个默认的实现RecordTimestampAssigner<>()

而默认实现的内容也十分简单,一起看一下,必须是记录中已经注册了时间属性。

接下来interfaceWatermarkGeneratorSupplier<T>这个接口。

是返回一个WatermarkGenerator<T>类型的方法,继续看下interfaceWatermarkGenerator<T>做了哪些操作

提供了两个水印发送的方式,接下来对这两个方式进行说明:

onEvent每条记录进来都会调用一次这个方法,入参有3个,第一个是记录,第二个是记录携带的时间,如果注册了时间就会有,第三个参数时水印发射器WatermarkOutputoutput,可以通过这个参数对水印进行发射,用户可以根据自己的业务场景来编写自己的水印生成以及发射逻辑。该方法的重点是每条记录都会调用.

onPeriodicEmit: 该方法是Flink提供的一个定时器方法,每隔一段时间会调用此方法,入参是WatermarkOutputoutput,用户可以通过这个方法每隔一段时间发送一次水印,当记录数过多时,每条记录都发送一次水印明显不合适,也影响性能,此时可以通过这个方法进行水印的定时发送,而onEvent只记录当前水印而选择不发射出去。该方法的参数配置为env.getConfig().setAutoWatermarkInterval(300L),入参是毫秒数,表示隔多少毫秒向下游算子发送一次水印。

而WatermarkStrategy中也提供了一些常用的WatermarkGenerator<T>供用户使用,比如

BoundedOutOfOrdernessWatermarks<T>类中就是一个在onEvent中记录水印,通过onPeriodicEmit方法定时向下游发送水印的实现,构造参数maxOutOfOrderness是提供给记录乱序的,运行最大延迟间隔。MaxTimestamp是当前的水印记录。BoundedOutOfOrdernessWatermarks<T>的大致实现如下

使用方法也十分的简单,提供的是一个静态方法,只需直接调用即可

WatermarkStrategy.<Map<String,Object>>forBoundedOutOfOrderness(Duration.ofSeconds(1))

使用水印

最后结合项目的需求将原来的使用水印的地方改成如下了

类图及FLINK水印算子简要流程

先上类图,方便理解

接着简单介绍下流程

首先TimestampsAndWatermarksOperator算子会在open方法中初始化用户定义的水印逻辑及方式,并且如果需要定时发送水印会,注册一个定时器触发水印定时发送。

当元素到达算子后会调用processElement(StreamRecord<T>element)

方法很简单,如果元素已经被注册了时间,就直接获取时间,或者设置为LONG.MIN_VALUE,然后根据用户定义的timestampAssigner.extractTimestamp从记录中抽取时间属性,然后再将时间写入元素中,最后调用用户定义的watermarkGenerator.onEvent方法,根据用户的逻辑选择刷新水印以及是否发射水印。

上面初始化中提到了,如果需要定时发送水印,则会注册一个定时器,而定时器的方法如下

通过onProcessingTime来触发定时器的内容,而内容也十分简单,先调用用户定义的watermarkGenerator.onPeriodicEmit方法发送水印,然后获取当前时间,最后注册当前时间加水印定时发送间隔的定时触发器,等待下次触发该方法。

参考资料

https://zhuanlan.zhihu.com/p/158951593

https://blog.csdn.net/zhaoyuqiang/article/details/107453466

END

文章转载自IT那活儿,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论