
jobmanager定期发起checkpoint,向source task发送触发标记(设置的时间)
—当sourcetask收到这个标记后会在数据流中安装挡板,同时自己会进行checkpoint,会将挡板向下游传递(防止分组前后的checkpoint点不一样)
—下游task收到挡板后进行checkpoint
—当所有的task都处理同一次checkpoint后,一次checkpoint就完成了
—删除旧了的checkpoint,只保留最新一次
3、代码实现org.apache.flink flink-statebackend-rocksdb_2.111.11.2
开启checkpoint
—任务失败后重新启动需要指定从哪一个checkpoint的路径中恢复任务 hdfs:///master:9000/flink/checkpoint/183606dcf5bcfde823b5a495ca435a04/chk-17
—命令行加一个-s恢复一样 flink run -c com.shujia.state.Demo4Checkpoint -s hdfs://master:9000/flink/checkpoint/8ae8e8bb237063d1c90295b84ae32a17/chk-31
flink-1.0.jar
object Demo04checkpoint {
def main(args: Array[String]): Unit = {
//创建flink环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(10000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
val config: CheckpointConfig = env.getCheckpointConfig
//任务失败后自动保留最新的checkpoint文件
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置状态后端,保存状态的位置
//val stateBackend: StateBackend = new FsStateBackend("hdfs://master:9000/flink/checkpoint", true)
//增量快照
val stateBackend: StateBackend = new RocksDBStateBackend("hdfs://master:9000/flink/checkpoint", true)
env.setStateBackend(stateBackend)
//获取数据
val lineDS: DataStream[String] = env.socketTextStream("master", 8888)
//将单词切分
val wordsDS = lineDS.flatMap(_.split(","))
//将数据存储kv
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//将数据分组
kvDS.keyBy(_._1)
.sum(1)
.print()
env.execute()
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)