暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink小知识--State Processor API的简单讲解(1) State的读取

大数据小知识 2021-07-07
1217

什么是State Processor API

 官方地址:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

     使用 State Processor API 可以 读取、写入和修改 savepointscheckpoints ,也可以转为SQL查询来分析和处理状态数据。

  • 引入jar
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-state-processor-api_2.11</artifactId>
    <version>1.13.0</version>
</dependency>


Reading State

1. Operator State 的读取

//读取 Operator State 时,只需指定算子的 uid、状态名称和类型信息。

//例如: FlinkKafkaConsumerBase类中保存 分区 偏移量 
// private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
// private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

DataSet<Integer> listState  = savepoint.readListState<>(
    "my-uid",
    "list-state",
    Types.INT);
DataSet<Integer> listState  = savepoint.readUnionState<>(
    "my-uid",
    "union-state",
    Types.INT);
DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState<>(
    "my-uid",
    "broadcast-state",
    Types.INT,
    Types.INT);
   
//在状态描述符(StateDescriptor)中使用了自定义类型序列化器 TypeSerializer
DataSet<Integer> listState = savepoint.readListState<>(
    "uid",
    "list-state"
    Types.INT,
    new MyCustomIntSerializer());

2. Keyed State的读取

注: java scala 代码产生的state 不要混着读取, 可能造成key序列化错误

 读取keyed state时,使用 readKeyedState 指定uid和KeyedStateReaderFunction<KeyType, OutputType> 函数来获取对应的 state

KeyedStateReaderFunction函数允许用户读取任意列和复杂的状态类型,如ListState、MapState和AggregatingState。

  • 作业自定义状态checkpoint
 public static class StatefulFunctionWithTime extends KeyedProcessFunction<StringTuple2<String,Integer>, Void{

        ValueState<Integer> state;

        ListState<Long> updateTimes;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
            state = getRuntimeContext().getState(stateDescriptor);

            ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
            updateTimes = getRuntimeContext().getListState(updateDescriptor);
        }

        @Override
        public void processElement(Tuple2<String,Integer> value, Context ctx, Collector<Void> out) throws Exception {
            state.update(value.f1 + 1);
            updateTimes.add(System.currentTimeMillis());
        }
    }

  • 通过定义输出类型和相应的keyedstaterereaderfunction来读取

   public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        //指定checkpoint 的路径
        ExistingSavepoint savepoint =
                Savepoint.load(bEnv, "file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\3f019c9823b8dcb398b231576c583316\\chk-1"new MemoryStateBackend());
        //读取key state
        DataSet<KeyedStates> keyedState = savepoint.readKeyedState("window_uid"new ReaderFunction());
        keyedState.print();

    }
    //包装类 DataSet对外输出
    public static class KeyedStates {
        public String key;
        public long value;
        public List<Long> times;

        @Override
        public String toString() {
            return "KeyedStates{" +
                    "key='" + key + '\'' +
                    ", value=" + value +
                    ", times=" + times +
                    '}';
        }
    }
        // 作业的状态如何设置 相对应的读取的时候如何设置 一一对应
    public static class ReaderFunction extends KeyedStateReaderFunction<StringKeyedStates{

        ValueState<Integer> state;

        ListState<Long> updateTimes;

        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
            state = getRuntimeContext().getState(stateDescriptor);

            ListStateDescriptor<Long> updateDescriptor = new ListStateDescriptor<>("times", Types.LONG);
            updateTimes = getRuntimeContext().getListState(updateDescriptor);
        }

        @Override
        public void readKey(
                String key,
                Context ctx,
                Collector<KeyedStates> out)
 throws Exception 
{

            KeyedStates data = new KeyedStates();
            data.key    = key;
            data.value  = state.value();
            data.times  = StreamSupport
                    .stream(updateTimes.get().spliterator(), false)
                    .collect(Collectors.toList());

            out.collect(data);
        }
    }

  • checkpoint state 的数据
KeyedStates{key='a', value=2, times=[1625486225091162548623809116254862390911625486239991]}
KeyedStates{key='d', value=2, times=[1625486233790162548623859116254862394911625486240391]}
KeyedStates{key='c', value=2, times=[1625486229989162548623819116254862391911625486240091]}
KeyedStates{key='2', value=2, times=[1625486245292]}
KeyedStates{key='b', value=2, times=[1625486227788162548623819116254862391911625486240091]}
KeyedStates{key='e', value=2, times=[1625486242192]}

3. Window State的读取

  • Window 作业
// 执行代码
.....
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
                .aggregate(new ClickCounter())
                .uid("click-window")
                .print();
static class Click {
        public String userId;
        public LocalDateTime time;
        public Click(String f0, LocalDateTime localDateTime) {
            this.userId = f0;
            this.time = localDateTime;
        }
    }
//做一个简单的数据累计
    static class ClickCounter implements AggregateFunction<ClickIntegerInteger{
        @Override
        public Integer createAccumulator() {
            return 0;
        }
        @Override
        public Integer add(Click value, Integer accumulator) {
            return 1 + accumulator;
        }
        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }
        @Override
        public Integer merge(Integer a, Integer b) {
            return a + b;
        }
    }

  • Window State的读取
public static void main(String[] args) throws Exception {
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(batchEnv,
                "file:///D:\\IDEAspaces\\bigdata_study\\bigdata_flink\\data\\checkpoint\\e2e118e23e6c3f775c9d7f477d2b9272\\chk-1",
                new MemoryStateBackend()
        );
//读取Window State
        DataSet<ClickState> clickStateDataSet = savepoint
                .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
      // 传入uid,作业执行的Agg函数,读取state函数,key类型,acc增量计算值类型,读取state函数的输出
                .aggregate("click-window"new ClickCounter(), new ClickReader(), Types.STRING, Types.INT, TypeInformation.of(ClickState.class));
  //Window State 数据展示             
        clickStateDataSet.flatMap(new FlatMapFunction<ClickState, Object>() {
            @Override
            public void flatMap(ClickState value, Collector<Object> out) throws Exception {
                out.collect(value.window.getStart()+"--"+value.window.getEnd()+":"+value.userId+":"+value.count);
            }
        }).print();
 //在当前窗口内的数据
 ///1625628000000--1625628300000::1
 //1625628000000--1625628300000:a:5
 //1625628000000--1625628300000:b:2
    }

    static class ClickReader extends WindowReaderFunction<IntegerClickStateStringTimeWindow{
        @Override
        public void readWindow(String key, Context<TimeWindow> context, Iterable<Integer> elements, Collector<ClickState> out) throws Exception {
            ClickState state = new ClickState();
            state.userId = key;
            state.count = elements.iterator().next();
            state.window = context.window();
            state.triggerTimers = context.registeredEventTimeTimers();

            out.collect(state);
        }
    }
    
    public static class ClickState {
        public String userId;
        public int count;
        public TimeWindow window;
        public Set<Long> triggerTimers;
    }


文章转载自大数据小知识,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论