暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink操练(三十四)之自定义键控状态(二)ListState

逗先生大数据 2021-11-22
731

0 简介

ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下:

  • ListState.add(value: T)

  • ListState.addAll(values: java.util.List[T])

  • ListState.get()返回Iterable[T]

  • ListState.update(values: java.util.List[T])

  ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。

1.实例

1.1 实例一

首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法:

    private var itemState : ListState[ItemViewCount] = _

override def open(parameters: Configuration): Unit = {

//命名状态变量的名字和类型
val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount])
itemState = getRuntimeContext.getListState(itemStateDescription)
}

ListStateDescriptor提供了几种不同的定义方式:


两个参数分别是ListStateDescriptor的名字和typeClass


1.2 实例二

package qiuhua;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;

/**
* @program: bigdata_learn
* @description: 通过ListState求key 出现了 3 次,则需要计算平均值
* @author: Mr.逗
* @create: 2021-09-08 16:18
**/

public class CountAverageWithListState extends RichFlatMapFunction<Tuple2<Long,Long>, Tuple2<Long,Double>> {
/**
* ValueState : 里面只能存一条元素
* ListState :里面可以存很多数据
*/

private ListState<Tuple2<Long,Long>> elementsByKey;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//注册状态
ListStateDescriptor<Tuple2<Long, Long>> descriptor = new ListStateDescriptor<>
("list_state"//状态名字
, Types.TUPLE(Types.LONG, Types.LONG)//状态存储的数据类型
);
elementsByKey=getRuntimeContext().getListState(descriptor);
}

@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Double>> out) throws Exception {
Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();//拿到当前key的状态值
//如果状态值没有初始化,则初始化
if(currentState==null)
{
elementsByKey.addAll(Collections.emptyList());
}
//更新状态
elementsByKey.update((List<Tuple2<Long, Long>>) value);
//判断,如果当前key出现了3次,则需要计算平均值,并且输出
List<Tuple2<Long, Long>> allElements = Lists.newArrayList(currentState);
if (allElements.size()==3)
{
long count=0;
long sum=0;
for(Tuple2<Long,Long> ele:allElements)
{
count++;
sum+=ele.f1;
}
double avg=(double)sum/count;
out.collect(Tuple2.of(value.f0,avg));
//清除状态
elementsByKey.clear();
}
}
}

总结

Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:

  • 直接基于keyedStream或者由keyedStream转换来的windowedStream

  • 必须继承RichFunction

实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:

val fromTransactionDataStream = watermarkTransaction
.keyBy(_.code)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))

val transaction = fromTransactionDataStream
.apply(new StockTransactionApply)
.keyBy(_._3)
.flatMap(new TransactionStateFlatMapFunction)


文章转载自逗先生大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论