暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink CookBook | 窗口触发机制

data之道 2020-01-19
1115

导语

    在上篇文章里,聊了怎么给数据流的消息分配时间戳以及水位发射机制,但也遗留了几个问题,窗口在接收到水位后,根据什么样的机制触发窗口计算?怎么判断一个消息是迟到消息,迟到消息怎么处理?怎么将一个消息分配到特定的窗口里的?本文尝试分析这些问题的解决方式,希望对您有所帮助。

什么是窗口
在流式计算里,窗口是个很常用的算子,它将无界数据流切分成有界数据流,然后再在有界数据流上进行转换操作,比如聚合。窗口提供了一种将数据分组到桶里的方法,但桶的大小是有限的,然后对这些桶里的数据进行计算。比如一个时间窗口,将流分为每5分钟一个窗口,并计算每个窗口收到了多少事件。
Flink内置了三种窗口实现:滚动窗口、滑动窗口、会话窗口,可参阅Flink CookBook—流式计算介绍详细介绍了它们的区别、使用场景等。因为滚动窗口的实现相对简单,本文大部分篇幅分析滚动窗口。
窗口使用步骤

窗口算子可以作用在keyed流或nonkeyed流,在keyed流上的窗口会并行化执行,但nonkeyed流上的窗口只会在一个线程里执行。创建窗口算子,需要指定两个窗口组件:

  • window assigner:窗口分配器,决定输入流的元素怎么进行窗口分配,方法给一个元素分配到一个或多个窗口

  • window function:窗口函数,窗口函数作用在WindowedStream或AllWindowedStream,对分配到一个窗口里的所有元素进行实际的函数计算

使用窗口的一般过程:
// 定义一个keyedStream
stream.keyBy(...)
//指定 windowassigner,window函数返回WindowedStream
.window(...)
// 可选的,指定触发器,覆盖默认触发器
.trigger(...)
// 可选的,指定evictor
.evictor(...)
//指定windowfunction
.reduce/aggregate/process(...)


// 定义nonkeyed stream
stream
// 指定windowassigner,window函数返回AllWindowedStream
.windowAll(...)
// 指定windowfunction
.reduce/aggregate/process(...)
也可以用timeWindow方法,window的简写方式,根据传入的参数、配置的时间策略,隐似调用window函数指定windowassinger。
Window算子流程

水位生成器和窗口分配器作用不一样,水位生成器只影响给流里的元素增加时间属性、生成水位、发射水位这个过程;窗口分配器是将一个元素分配到一个或多个窗口。

Flink实现了多种窗口分配器,比如TumblingEventTimeWindows(事件时间的滚动窗口)、TumblingProcessingTimeWindows(处理时间的滚动窗口)、SlidingEventTimeWindows(事件时间的滑动窗口)、SlidingProcessingTimeWindows(处理时间滑动窗口)、DynamicEventTimeSessionWindows(动态事件时间会话窗口)等,后面会有详细的介绍。

窗口算子在每接收到一个元素后,就会调用窗口分配器,获取到元素分配的时间窗口列表(一个元素可能分配多个时间窗口),时间窗口对象(TimeWindow(long start, long end))只有两个属性:start time、end time,表示这个元素所在时间窗口的上限和下限。Flink根据元素的时间窗口判断这个元素是否迟到,如果元素没有迟到,将其添加到窗口状态,然后触发器判断是否需要触发计算。整个流程如下:


Window Assigners(窗口分配器) 

    窗口分配器主要做的工作就是将一个元素分配到一个或多个窗口里,在第一个元素分配后,窗口才创建,换句话说,窗口里一个元素也没有,那这个窗口就不存在。不同的窗口分配器有不同的分配算法:滚动窗口的关键点在于一个元素只会属于一个窗口,而在滑动窗口中,一个元素可能属于一个或多个窗口;事件时间的特点在于计算时间窗口时,用元素的属性去计算(时间分配器给元素分配了时间属性),而处理时间用系统当前时间计算的。
  • TumblingEventTimeWindows
滚动事件窗口接收两个参数:windowSize窗口大小、offset窗口偏移量,比如想一个小时生成一个窗口,但窗口开始时间在每小时的15分钟,参数可以是这样的:(Time.hours(1),Time.minutes(15)),那么窗口的开始时间就是0:15:00,1:15:00,2:15:00....。根据元素的时间属性、窗口大小、窗口偏移量,获取元素所在窗口起始时间:start=timestamp-(timestamp-offset+windowSize)%windowSize,然后计算元素所在窗口结束时间:end=start+windowSize。同一个时间窗口里的元素,获取到的时间窗口范围是一样的。默认用EventTimeTrigger触发器。
  • TumblingProcessingTimeWindows
滚动处理时间窗口接收同样的参数,计算窗口的开始时间、结束时间算法都是一样,唯一不同的是,时间戳不是元素本身的时间属性,而是系统的当前时间,即根据当前处理时的时间给元素分配一个窗口,这就是为什么称之为处理时间的原因。默认用ProcessingTimeTrigger触发器。
  • SlidingEventTimeWindows
滑动事件时间窗口,接收三个参数,处理窗口大小、窗口偏移量,还有个窗口滑动步长(slide),一个元素可能属于之前的一个滑动窗口,所以返回的是一个窗口列表。默认用EventTimeTrigger触发器。

Trigger(窗口触发器)

就像前面说的那样,每个窗口分配器都有一个触发器,每当消息添加到窗口后,这个消息就会传递给窗口触发器,trigger定义什么情况下认为窗口已经准备好,可以触发计算,或者什么时候清除窗口。比如一旦水位到达窗口的结束时间,就触发窗口计算。Flink内置了多种不同触发器策略:
  • EventTimeTrigger

窗口在接收到一个元素时,会将元素的时间窗口和当前水位比较,一旦水位大于时间窗口的结束时间,就会触发计算。正常到达(时间是在水位之后)的元素不会触发计算,如果是在一定延迟时间(延迟时间默认是0,allowedLateness函数设置)内到达的元素,就会触发计算,被称为延迟触发,因为是由延迟事件触发的。

/** 1、是否延迟达到判断:
  每接收到一个元素时,判断是否在一定延迟时间内到达,
  如果不是,就直接跳过,不会进行第二步触发器判断
**/
if (!isWindowLate(window)) {
/** 2、触发器判断:
水位已经到达窗口的结束时间,就触发计算
**/
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
   // 水位还未到达窗口结束时间,正常到达的,不触发计算,将其加到事件时间计时器队列
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}

除了延迟触发,还有另一种触发计算的场景,称为主触发,是第一次触发窗口计算,就是在Flink内部接收到一个新的水位时,会触发比它小的所有时间窗口的计算。水位向前推进,触发窗口计算的逻辑实现:

所以如果使用事件时间窗口,一定要配置水位生成器,用于给元素分配时间属性并生成水位,水位生成分为periodic和punctuated两种,详细的可参考水位生成机制
  • ProcessingTimeTrigger

如果使用的是ProcessingTime类型的窗口分配器,那么就用不到水位,这种情况下怎么触发窗口计算的呢?

注册定时器,在窗口的结束时间定时触发。process time没有数据延迟这一说。

窗口函数

    窗口函数分为两种,一种是增量聚合函数(比如ReduceFunction或AggregateFunction),函数会对新接收到的元素立即聚合,并将更新后的结果存储到窗口状态,也就是窗口状态只保留结果值;另一种是全量窗口函数, 将新接收到的元素放到状态里,即在调用窗口函数前,在内部状态缓存窗口所有元素,当进行函数计算时,再对所有元素进行遍历。

显然增量聚会函数会减少状态存储空间。假设要统计每五分钟的访问pv,预测每五分钟的pv是上亿级别的,如个uid是32长度得哈希值,显然如果用全量函数,这个状态存储代价是不菲的。但是增量聚合的场景是受限制的,比如统计每五分钟访问量TOP100的IP。


allowedLateness

windowedStream和AllWindowedStream有个allowedLateness方法:允许窗口里的数据最多延迟多久到达,如果窗口结束时间+allowedLateness<=当前水位,意味着没有在一定的延迟时间内到达,会删除元素、进行侧输出,否则视为一个正常到达的元素,将其放到窗口状态里:

    水位标识当前系统处理的时间进度,延迟是相对水位来讲的,所以延迟到达的元素,会再次触发窗口计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + allowedLateness的时间,窗口的数据及元数据信息才会被删除。应将延迟触发视为对先前计算结果的更新,应用程序里应该考虑这种计算结果的重复性。



相关阅读:


    流式计算介绍

    Flink核心知识介绍

    Flink环境准备

    Flink维表Join原理解析

    FlinkJDBC Table Source详解

    Flink水位生成机制

「在一起看」👇

文章转载自data之道,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论