Checkpoint概念
Checkpoint 使 Flink 的状态具有良好的容错性,通过 checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
官方介绍地址
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/ops/state/checkpoints/
task失败or Job失败
Job失败是指整个任务终止,
task级别失败是指task中有失败的task,但是整个任务没有失败。checkpoint是task级别的失败中恢复。
保留 Checkpoint
(1)默认情况下,恢复失败的作业,是不保存的。当程序取消,checkpoint就会消失。
(2)可以通过配置来保留checkpoint
(3)配置方式1
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
(4)配置方式2
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
Flink中使用checkpoint实现task级别的重启
(1)开启checkpoint
(2)指定存储路径
(3)指定恢复策略
checkpoint-demo
为了测试重启,需要在程序中创建一个异常。
程序
object OperatorStateDemo {def main(args: Array[String]): Unit = {// set up the streaming execution environmentval env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//开启状态检查checkpoint,参数:快照的周期,快照的模式env.enableCheckpointing(1000)//存储env.getCheckpointConfig.setCheckpointStorage(new URI("file:///d:/checkpointdata/"))//配置重启策略,最多重启3次,每过1秒重启一次env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000))// ingest sensor streamval sensorData = env.socketTextStream("localhost", 9999)val keyedSensorData = sensorDataval alerts: DataStream[String] = keyedSensorData.map( new OperatorStateMapFunction())// print result stream to standard outalerts.print()// execute applicationenv.execute("Generate Temperature Alerts")}}class OperatorStateMapFunction extends MapFunction[String,String] with CheckpointedFunction{var listState:ListState[String] = _//map的处理逻辑override def map(value: String): String = {//在程序中抛出异常if(value.equals("y") && RandomUtils.nextInt(1,18)% 3 == 0){throw new Exception("出错了" + value)}//将本条数据插入到list中listState.add(value)val strings = listState.get()val sb:StringBuilder = new StringBuilderstrings.forEach((x:String) => sb.append(x))sb.toString()}//系统做快照的时候,调用的方法,用户可以在持久化前,对状态数据进行处理,一般不需要处理override def snapshotState(context: FunctionSnapshotContext): Unit = {//自动进行快照// println("快照存储触发,第几次checkpoint" + context.getCheckpointId)}//算子在启动的开始的时候,会调用,进行初始化的时候调用,获取状态管理器override def initializeState(context: FunctionInitializationContext): Unit = {//算子状态的存储器val operatorStateStore = context.getOperatorStateStore()//当用户重启的时候,会进行加载持久化中的数据listState = operatorStateStore.getListState(new ListStateDescriptor[String]("strings", classOf[String]))}}
总结
(1)checkpoint是为了解决task失败后,但是整个job还在运行,从失败的地方恢复任务,继续进行计算。
(2)checkpoint保留需要自己设置
(3)checkpoint的demo
奇迹的出现往往就在再坚持一下的时候!
感谢阅读。期待点赞、分享、关注。
文章转载自beenrun,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




