暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink中使用checkpoint实现task级别的重启(36)

beenrun 2022-10-05
1598

Checkpoint概念

Checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。

官方介绍地址
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/ops/state/checkpoints/

task失败or Job失败

Job失败是指整个任务终止,
task级别失败是指task中有失败的task,但是整个任务没有失败。checkpoint是task级别的失败中恢复。

保留 Checkpoint

(1)默认情况下,恢复失败的作业,是不保存的。当程序取消,checkpoint就会消失。
(2)可以通过配置来保留checkpoint
(3)配置方式1

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。

(4)配置方式2

ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。

Flink中使用checkpoint实现task级别的重启

(1)开启checkpoint
(2)指定存储路径
(3)指定恢复策略

checkpoint-demo

为了测试重启,需要在程序中创建一个异常。
程序

    object OperatorStateDemo {
    def main(args: Array[String]): Unit = {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //开启状态检查checkpoint,参数:快照的周期,快照的模式
    env.enableCheckpointing(1000)
    //存储
    env.getCheckpointConfig.setCheckpointStorage(new URI("file:///d:/checkpointdata/"))


    //配置重启策略,最多重启3次,每过1秒重启一次
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000))
    // ingest sensor stream
    val sensorData = env.socketTextStream("localhost", 9999)


    val keyedSensorData = sensorData


    val alerts: DataStream[String] = keyedSensorData
    .map( new OperatorStateMapFunction())


    // print result stream to standard out
    alerts.print()


    // execute application
    env.execute("Generate Temperature Alerts")
    }


    }


    class OperatorStateMapFunction extends MapFunction[String,String] with CheckpointedFunction{
    var listState:ListState[String] = _


    //map的处理逻辑
    override def map(value: String): String = {
    //在程序中抛出异常
    if(value.equals("y") && RandomUtils.nextInt(1,18)% 3 == 0){
    throw new Exception("出错了" + value)
    }


    //将本条数据插入到list中
    listState.add(value)
    val strings = listState.get()


    val sb:StringBuilder = new StringBuilder
    strings.forEach((x:String) => sb.append(x))
    sb.toString()
    }
    //系统做快照的时候,调用的方法,用户可以在持久化前,对状态数据进行处理,一般不需要处理
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
    //自动进行快照
    // println("快照存储触发,第几次checkpoint" + context.getCheckpointId)
    }


    //算子在启动的开始的时候,会调用,进行初始化的时候调用,获取状态管理器
    override def initializeState(context: FunctionInitializationContext): Unit = {
    //算子状态的存储器
    val operatorStateStore = context.getOperatorStateStore()
    //当用户重启的时候,会进行加载持久化中的数据
    listState = operatorStateStore.getListState(new ListStateDescriptor[String]("strings", classOf[String]))
    }
    }

    总结

    (1)checkpoint是为了解决task失败后,但是整个job还在运行,从失败的地方恢复任务,继续进行计算。
    (2)checkpoint保留需要自己设置
    (3)checkpoint的demo

    奇迹的出现往往就在再坚持一下的时候!

    感谢阅读。期待点赞、分享、关注。


    文章转载自beenrun,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论