
关注我们, 一起成长!
首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。我们在这里讨论的是state。

updateStateByKey
mapWithState
updateStateByKey和mapWithState的区别
updateStateByKey示例:
def updateFunction(currValues:Seq[Int],preValue:Option[Int]): Option[Int] = {val currValueSum = currValues.sum//上面的Int类型都可以用对象类型替换Some(currValueSum + preValue.getOrElse(0)) //当前值的和加上历史值}kafkaStream.map(r => (r._2,1)).updateStateByKey(updateFunction _)
这里的updateFunction方法就是需要我们自己去实现的状态跟新的逻辑,currValues
就是当前批次的所有值,preValue
是历史维护的状态,updateStateByKey
返回的是包含历史所有状态信息的DStream。
mapWithState示例:
val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())//自定义mappingFunction,累加单词出现的次数并更新状态val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {val sum = count.getOrElse(0) + state.getOption.getOrElse(0)val output = (word, sum)state.update(sum)output}//调用mapWithState进行管理流数据的状态kafkaStream.map(r => (r._2,1)).mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()

Keyed State
Operator State
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过 update
方法更新状态值,通过value()
方法获取状态值。ListState:即key上的状态值为一个列表。可以通过 add
方法往列表中附加值;也可以通过get()
方法返回一个Iterable<T>
来遍历状态值。ReducingState:这种状态通过用户传入的reduceFunction,每次调用 add
方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与 add
方法中传入的元素类型不同(这种状态将会在Flink未来版本中被删除)。MapState:即状态值为一个map。用户通过 put
或putAll
方法添加元素。
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {/*** ValueState状态句柄. 第一个值为count,第二个值为sum。*/private transient ValueState<Tuple2<Long, Long>> sum;@Overridepublic void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {// 获取当前状态值Tuple2<Long, Long> currentSum = sum.value();// 更新currentSum.f0 += 1;currentSum.f1 += input.f1;// 更新状态值sum.update(currentSum);// 如果count >=2 清空状态值,重新计算if (currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));sum.clear();}}@Overridepublic void open(Configuration config) {ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>("average", // 状态名称TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 状态类型Tuple2.of(0L, 0L)); // 状态默认值sum = getRuntimeContext().getState(descriptor);}}// ...env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(0).flatMap(new CountWindowAverage()).print();// the printed output will be (1,4) and (1,5)

期待与大佬技术交流、思想碰撞!点个关注,交个朋友↓

Spark必知必会 | Spark SQL自定义函数UDF、UDAF聚合函数以及开窗函数的使用

数仓开发到底在开发什么?

Flink重点难点 | Flink SQL高效Top-N方案与原理
文章转载自实时数仓Flink,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。





