暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink 流状态详解及代码实战

大数据技能圈 2024-07-10
26

 引言 

在 Flink 的流处理中,状态是处理逻辑的重要组成部分,它可以保存在内存或外部存储系统中,用于在处理过程中保存和查询数据。状态对于实现窗口操作、聚合计算等操作至关重要。

 一、流状态的概念 

Flink 支持以下几种类型的流状态:

  • 值状态(ValueState):保存单个值,每个 key 只保存一个值。

  • 列表状态(ListState):保存一个值的列表,可以保存相同 key 的多个值。

  • 减少状态(ReducingState):通过 reduce 函数聚合值。

  • 折叠状态(FoldingState):通过 fold 函数和初始值聚合值。

  • 映射状态(MapState):保存键值对的集合。

 二、状态的生命周期 

Flink 中的状态具有以下生命周期:

  • 初始化:在操作符的 open() 方法中初始化状态。

  • 更新:在处理元素时更新状态。

  • 访问:在处理元素时访问状态。

  • 清理:在操作符的 close() 方法中清理状态。

 三、状态后端 

Flink 提供了多种状态后端,用于存储和管理状态:

  • 内存状态后端:将状态存储在 TaskManager 的内存中。

  • 文件系统状态后端:将状态存储在文件系统中。

  • RocksDB 状态后端:使用 RocksDB 作为状态的存储,适用于大规模状态。

这或许是一个对你有用的开源项目data-warehouse-learning 项目是一套基于 MySQL + Kafka + Hadoop + Hive + Dolphinscheduler + Doris + Seatunnel + Paimon + Hudi + Iceberg + Flink + Dinky + DataRT + SuperSet 实现的实时离线数仓(数据湖)系统,以大家最熟悉的电商业务为切入点,详细讲述并实现了数据产生、同步、数据建模、数仓(数据湖)建设、数据服务、BI报表展示等数据全链路处理流程。

  • https://gitee.com/wzylzjtn/data-warehouse-learning
  • https://github.com/Mrkuhuo/data-warehouse-learning

项目演示:

 四、每种状态代码实现 
以下是每种状态类型的代码实现示例:

 4.1 值状态(ValueState) 

    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    // ... 其他必要的导入


    public class ValueStateExample {
    public static void main(String[] args) throws Exception {
    // 设置执行环境和数据流
    // ...


    input.process(new KeyedProcessFunction<String, String, String>() {
    private ValueState<Integer> countState;


    @Override
    public void open(Configuration config) {
    countState = getRuntimeContext()
    .getState(new ValueStateDescriptor<>("count", Integer.class, 0));
    }


    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
    Integer count = countState.value() + 1;
    countState.update(count);
    out.collect("Count for " + value + ": " + count);
    }
    }).print();


    // 执行作业
    }
    }

     4.2 列表状态(ListState)

      import org.apache.flink.api.common.state.ListState;
      import org.apache.flink.api.common.state.ListStateDescriptor;
      import java.util.ArrayList;
      import java.util.List;
      // ... 其他必要的导入


      public class ListStateExample extends KeyedProcessFunction<String, String, String> {
      // ...


      private ListState<String> listState;


      @Override
      public void open(Configuration config) {
      listState = getRuntimeContext()
      .getListState(new ListStateDescriptor<>("list", String.class));
      }


      @Override
      public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
      List<String> values = new ArrayList<>();
      listState.get().forEach(values::add);
      values.add(value);
      listState.clear();
      values.forEach(listState::add);
      out.collect("List State: " + values);
      }
      }

       4.3 减少状态(ReducingState)

        import org.apache.flink.api.common.state.ReducingState;
        import org.apache.flink.api.common.state.ReducingStateDescriptor;
        // ... 其他必要的导入


        public class ReducingStateExample extends KeyedProcessFunction<String, Integer, String> {
        // ...


        private ReducingState<Integer, Integer> reducingState;


        @Override
        public void open(Configuration config) {
        reducingState = getRuntimeContext()
        .getReducingState(new ReducingStateDescriptor<>("reduce", (a, b) -> a + b, Integer.class));
        }


        @Override
        public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
        int sum = reducingState.get();
        reducingState.add(value);
        out.collect("Reduced Sum: " + sum);
        }
        }

         4.4 折叠状态(FoldingState)

          import org.apache.flink.api.common.state.FoldingState;
          import org.apache.flink.api.common.state.FoldingStateDescriptor;
          // ... 其他必要的导入


          public class FoldingStateExample extends KeyedProcessFunction<String, Integer, String> {
          // ...


          private FoldingState<Integer, Integer> foldingState;


          @Override
          public void open(Configuration config) {
          foldingState = getRuntimeContext()
          .getFoldingState(new FoldingStateDescriptor<>("fold", 0, (a, b) -> a + b));
          }


          @Override
          public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
          int total = foldingState.get();
          foldingState.add(value);
          out.collect("Folded Total: " + total);
          }
          }

           4.5 映射状态(MapState)

            import org.apache.flink.api.common.state.MapState;
            import org.apache.flink.api.common.state.MapStateDescriptor;
            import java.util.Map;
            // ... 其他必要的导入


            public class MapStateExample extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
            // ...


            private MapState<String, Integer> mapState;


            @Override
            public void open(Configuration config) {
            mapState = getRuntimeContext()
            .getMapState(new MapStateDescriptor<>("map", String.class, Integer.class));
            }


            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            mapState.put(value.f0, value.f1);
            out.collect("Map State: " + mapState.entries());
            }
            }

             五、状态的容错性 

            Flink 的状态是容错的。它通过定期对状态进行快照(checkpointing)来实现。在发生故障时,Flink 可以从最近的检查点恢复状态。

             六、性能调优

            • 选择合适的状态后端:对于大规模状态,使用 RocksDB 状态后端可以提高性能。

            • 减少状态访问的频率:频繁的状态访问可能会影响性能。

            • 调整检查点的间隔:合理的检查点间隔可以减少对性能的影响。

            项目文档地址

            添加作者进大数据交流群

            推荐阅读

            文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论