暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flink状态调优

大数据启示录 2022-09-08
164

 

checkpoint优化

 

一、设置最小时间间隔

当flink应用开启Checkpoint功能,并配置Checkpoint时间间隔,应用中就会根据指定的时间间隔周期性地对应用进行Checkpoint操作。默认情况下Checkpoint操作都是同步进行,也就是说,当前面触发的Checkpoint动作没有完全结束时,之后的Checkpoint操作将不会被触发。在这种情况下,如果Checkpoint过程持续的时间超过了配置的时间间隔,就会出现排队的情况。如果有非常多的Checkpoint操作在排队,就会占用额外的系统资源用于Checkpoint,此时用于任务计算的资源将会减少,进而影响到整个应用的性能和正常执行。

在这种情况下,如果大状态数据确实需要很长的时间来进行Checkpoint,那么只能对Checkpoint的时间间隔进行优化,可以通过Checkpoint之间的最小间隔参数进行配置,让Checkpoint之间根据Checkpoint执行速度进行调整,前面的Checkpoint没有完全结束,后面的Checkpoint操作也不会触发。

    streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

    通过最小时间间隔参数配置,可以降低Checkpoint对系统的性能影响,但需要注意的事,对于非常大的状态数据,最小时间间隔只能减轻Checkpoint之间的堆积情况。如果不能有效快速地完成Checkpoint,将会导致系统Checkpoint频次越来越低,当系统出现问题时,没有及时对状态数据有效地持久化,可能会导致系统丢失数据。因此,对于非常大的状态数据而言,应该对Checkpoint过程进行优化和调整,例如采用增量Checkpoint的方法等。

    用户也可以通过配置CheckpointConfig中setMaxConcurrentCheckpoints()方法设定并行执行的checkpoint数量,这种方法也能有效降低checkpoint堆积的问题,但会提高资源占用。同时,如果开始了并行checkpoint操作,当用户以手动方式触发savepoint的时候,checkpoint操作也将继续执行,这将影响到savepoint过程中对状态数据的持久化

    二、预估状态容量

    除了对已经运行的任务进行checkpoint优化,对整个任务需要的状态数据量进行预估也非常重要,这样才能选择合适的checkpoint策略。对任务状态数据存储的规划依赖于如下基本规则:

    1.正常情况下应该尽可能留有足够的资源来应对频繁的反压。

    2.需要尽可能提供给额外的资源,以便在任务出现异常中断的情况下处理积压的数据。这些资源的预估都取决于任务停止过程中数据的积压量,以及对任务恢复时间的要求。

    3.系统中出现临时性的反压没有太大的问题,但是如果系统中频繁出现临时性的反压,例如下游外部系统临时性变慢导致数据输出速率下降,这种情况就需要考虑给予算子一定的资源

    4.部分算子导致下游的算子的负载非常高,下游的算子完全是取决于上游算子的输出,因此对类似于窗口算子的估计也将会影响到整个任务的执行,应该尽可能给这些算子留有足够的资源以应对上游算子产生的影响。

    三、异步Snapshot

    默认情况下,应用中的checkpoint操作都是同步执行的,在条件允许的情况下应该尽可能地使用异步的snapshot,这样讲大幅度提升checkpoint的性能,尤其是在非常复杂的流式应用中,如多数据源关联、co-functions操作或windows操作等,都会有较好的性能改善。

    在使用异步快照需要确认应用遵循以下两点要求:

    1.首先必须是flink托管状态,即使用flink内部提供的托管状态所对应的数据结构,例如常用的有ValueState、ListState、ReducingState等类型状态。

    2.StateBackend必须支持异步快照,在flink1.2的版本之前,只有RocksDB完整地支持异步的Snapshot操作,从flink1.3版本以后可以在heap-based StateBackend中支持异步快照功能

    四.压缩状态数据

    flink中提供了针对checkpoint和savepoint的数据进行压缩的方法,目前flink仅支持通过用snappy压缩算法对状态数据进行压缩,在未来的版本中flink将支持其他压缩算法。在压缩过程中,flink的压缩算法支持key-group层面压缩,也就是不同的key-group分别被压缩成不同的部分,因此解压缩过程可以并发执行,这对大规模数据的压缩和解压缩带来非常高的性能提升和较强的可扩展性。flink中使用的压缩算法在ExecutionConfig中进行指定,通过将setUseSnapshotCompression方法中的值设定为true即可。

    五.观察checkpoint延迟时间

    checkpoint延迟启动时间并不会直接暴露在客户端中,而是需要通过以下公式计算得出。如果改时间过长,则表明算子在进行barrier对齐,等待上游的算子将数据写入到当前算子中,说明系统正处于一个反压状态下。checkpoint延迟时间可以通过整个端到端的计算时间减去异步持续的时间和同步持续的时间得出。

    RocksDB相关的优化

     

     开启State访问性能监控

    Flink1.13中引入了 State 访问的性能监控,即 latency tracking state。此功能不局限于 State Backend 的类型,自定义实现的 State Backend 也可以复用次功能。

    当然,State 访问的性能监控会产生一定的性能影响,所以,默认每 100 次做一次取样(sample),对不同的 State Backend 性能损失影响不同:
    对于 RocksDB State Backend,性能损失大概在 1%左右
    对于 Heap State Backend,性能损失最多可达 10%

    开启监控的方式,可以在指令中加入

      state.backend.latency-track.keyed-state-enabled: true  #启用访问状态的性能监控
      state.backend.latency-track.sample-interval: 100 #采样间隔
      state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确
      state.backend.latency-track.state-name-as-viriable: true  #将状态名作为变量

      正常情况下,开启第一个参数即可。 

      开启增量检查点和本地恢复

      开启增量检查点

        state.backend.incremental: true    #默认false,改为true
        或代码中指定
        new EmbeddedRocksDBStateBackend(true)

        增量检查点,表示在 checkpoint 时,只备份和上个检查点相比,发生变化的检查点。在状态比较大的情况下,是否开启增量检查点,对性能的影响会非常大。比如,在程序运行很长时间之后,总的状态量达到1G,每次变化的状态只有100M甚至更低,那么在不开启增量备份的情况下,每次备份都要全量备份,也就是1G的状态量;如果开启了增量备份,每次只需要备份100M甚至更低;两者相比,增量备份检查点,可以大大节省备份的时间。

        在项目的实际使用过程中,曾经经理过一次大的性能问题,在没有开启增量备份检查点的情况下,每次备份需要消耗几十秒的时间,这对于实时计算来说,简直是个灾难;在使用 RocksDBStateBackend,并开启增量备份检查点之后,每次备份只需要几秒甚至几十毫秒就可以完成,大大节省了状态备份的时间。

        开启本地恢复

          state.backend.local-recovery: true

          调整预定义选项

          Flink 为 RocksDB 提供了一些预定义的选项集合,比如 DEFAULT、SPINNING_DISK_OPTIMIZED、SPINING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED。

          DEFAULT:啥都不配;
          SPINNING_DISK_OPTIMIZED:基于磁盘的优化;
          SPINING_DISK_OPTIMIZED_HIGH_MEM:基于磁盘和内存的优化;
          FLASH_SSD_OPTIMIZED:基于固态硬盘的优化;

          一般使用 SPINING_DISK_OPTIMIZED_HIGH_MEM 即可,如果条件充足,可以指定 FLASH_SSD_OPTIMIZED

          代码指定:

            EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
            embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
            env.setStateBackend(embeddedRocksDBStateBackend);

            提交参数指定:

              -Dstate.backend.rocksdb.predefined-options: SPINING_DISK_OPTIMIZED_HIGH_MEM #机械硬盘+内存

              其他高阶配置

              增大block缓存

              整个 RocksDB 共享一个 block cache(对应上图的Read Only Block Cache,最近读取的数据会放到 block cache 中),读取数据时内存的 cache 大小,直接影响数据读取效率;读取数据时,优先从内存读取,读取不到时,再从磁盘加载,所以,内存 cache 越大,缓存命中率越高。默认大小Wie 8MB,建议设置到 64~256MB,根据自身资源而定。

                state.backend.rocksdb.block.cache-size: 64m #默认 8m

                增大 write buffer 和 level 阈值大小

                RocksDB 中,每个 State 使用一个 Column Family,每个 Column Family 使用单独的 write buffer,默认是64MB,建议调大。

                调整 write buffer 时,通常要适当增加 L1 层的大小阈值 max-size-level-base,默认是 256 MB。该值太小,会导致能存档的 SST 文件过少,层级变多造成查找困难,需要更多层索引,才能命中需要的文件;值太大,造成文件过大,合并困难。建议设置为 target_file_size_base(默认64MB)的倍数,且不能太小,建议 5~10 倍,即 320~640MB。

                  state.backend.rocksdb.writebuffer.size: 128m
                  state.backend.rocksdb.compaction.level.max-size-level-base: 320m

                  增大 write buffer 数量

                  每个 Column Family 对应的 write buffer 最大数量,实际上是内存中“ReadOnly MemTable(只读内存表)”的最大数量,默认值是2。对于机械磁盘来说,如果内存足够大,可以调大到 5 左右,人多力量大!

                    state.backend.rocksdb.writebuffer.count: 5

                     增大后台线程数和 write buffer 合并数

                    1)增大后台线程数

                    用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值,人多力量大!

                      state.backend.rocksdb.thread.num: 4

                      2)增大 write buffer 最小合并数

                      将数据从 write buffer 中 flush 到磁盘时,需要合并的 write buffer 最小数量,默认值为 1,可以调到 3,减小合并次数。

                        state.backend.rocksdb.writebuffer.number-to-merge: 3

                        开启分区索引功能

                        Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存较小的场景中,性能提升10倍左右。如果在内存管控下 RocksDB 性能不如预期的话,这个也能作为一个性能优化点。

                          state.backend.rocksdb.memory.partitioned-index-filters:true #默认 false

                          参数设定案例

                            sudo -u hdfs $FLINK_HOME/bin/flink run-application -t yarn-application \
                            -Djobmanager.memory.process.size=1024m \
                            -Dtaskmanager.memory.process.size=1024m \
                            -Dtaskmanager.numberOfTaskSlots=1 \
                            -Dparallelism.default=12 \
                            -Dstate.backend.incremental=true \
                            -Dstate.backend.local-recovery=true \
                            -Dstate.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM \
                            -Dstate.backend.rocksdb.block.cache-size=64m \
                            -Dstate.backend.rocksdb.writebuffer.size=128m \
                            -Dstate.backend.rocksdb.compaction.level.max-size-level-base=320m \
                            -Dstate.backend.rocksdb.writebuffer.count=5 \
                            -Dstate.backend.rocksdb.thread.num=4 \
                            -Dstate.backend.rocksdb.writebuffer.number-to-merge=3 \
                            -Dstate.backend.rocksdb.memory.partitioned-index-filters=true \
                            -Dstate.backend.latency-track.keyed-state-enabled=true \
                            -Dyarn.application.name="TestDemo" \
                            /tmp/****-jar-with-dependencies.jar

                            参考链接:

                              https://blog.csdn.net/u013516966/article/details/122660050

                               


                              文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                              评论