Savepoint:Flink让时光倒流

现在互联网产品对数据的实时性要求极其强烈,比如,某电商产品的推荐系统,当一个用户点击页面就会在秒级内给出相应的推荐页面。进而,实时流处理技术讨论变得越加频繁《各大主流流处理框架大比拼》和《实时流处理框架选型:就应该这样“拉出来遛遛”》,比如,延迟性、吞吐量、watermark…

接下来,进入主题:Flink实时流处理中的“reprocess data”。

相信很多同行经常遇到以下几种case:

  • 开发新feature或者bug修复,程序新版本上线;
  • 不同版本产品的A/B test;
  • 评估和实现在新处理框架下的应用迁移,或者迁移到不同的集群

以上所有情况都可以使用Flink的savepoint功能实现。

Savepoint是什么

简而言之,Flink的savepoint是一个全局的、一致性的快照(snapshot)。其包含两方面:

  • 数据源所有数据的位置;
  • 并行操作的状态

“全局一致”是指所有的输入源数据在指定的位置,所有的并行操作的状态都被完全checkpoint了。注意理解这句话,可以多读几遍回味一下。

如果你的应用在过去某个时间点做了savepoint,那你随时可以从前面的savepoint更新发布应用。这时,新的应用会从savepoint中的操作状态进行初始化,并从savepoint的数据源位置开始重新处理所有数据。

Flink的savepoint是完全不依赖的,所以你每个应用可以有N个savepoint,你可以回退到多个位置重新开始你的应用(可以是不同的应用,如下图所示)。这个功能在流处理应用是相当强大的。

image

有读者会觉得上图似曾相识,其实你可能想到了Flink的checkpoint,这时是不是有点糊涂了,那savepoint和checkpoint到底啥关系呢?详细答案会在后续某篇文章揭晓,这里先简单说下:checkpoint是Flink实现容错的,savepoint仅仅只是checkpoint的一个扩展。如果checkpoint开启,那Flink会周期性的创建所有操作状态的checkpoint。savepoint和checkpoint最大的不同是,checkpoint会按时间间隔自动创建,而savepoint需要手动触发。

为了让 “reprocess data”得到更精确的结果,那我们不得不提event-time和processing-time或者ingestion-time的区别,这也是在各个流处理技术里常提到的时间语义。不过这里先不展开,后续也会有文章专门讲到。为了让 “reprocess data”得到更精确的结果需要使用event-time,因为依赖processing-time或者ingestion-time的应用会根据当前的wall-clock时间来处理。

如何实现savepoint

实际上,使用savepoint的前提有以下几点:

  • 开始checkpoint;
  • 可重复使用的数据源(e.g., Apache Kafka,Amazon Kinesis,或者文件系统);
  • 所有保存的状态需继承Flink的管理状态接口;
  • 合适的state backend配置

做到了这几点,那你可以通过CLI命令行实现savepoint并重新从savepoint开始应用:

  1. 创建savepoint

首先,获取Flink所有正在运行的job list:

1
2
3
user$ flink list
------------Running/Restarting Jobs------------
12.04.2016 16:20:33 : job_id : 12345678 (RUNNING)

接着,使用刚才获取到的job ID创建savepoint:

1
user$ flink savepoint job_id

这时你可以选择取消正在运行的job(可选操作):

1
user$ flink cancel job_id
  1. 从savepoint开启job
1
user$ flink run -d -s hdfs://savepoints/1 directory/your-updated-application.jar
如果更新应用,该咋办?

修改的应用从一个savepoint开始需要考虑以下两种情况:

  • 用户自定义逻辑的改变,比如,MapFunction;
  • 应用的拓扑的改变,比如,增加或者移除操作

如果你的情况属于上面描述的第一类,那不需要做其他额外处理。但是,第二种情况,Flink要求修改前后的操作要能匹配上,这样才好使用保存的操作状态。这时你需要手动在原始和更新的应用中分配操作ID,因为没有操作ID是不可能改变应用的拓扑,所以最好要尽可能的分配操作ID,如下:

1
2
3
4
5
6
7
8
9
10
11
DataStream stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid(“source-id”)
.shuffle()
// The stateful mapper with ID
.map(new StatefulMapper())
.uid(“mapper-id”)
// Stateless sink (no specific ID required)
stream.print()
总结

Savepoint是Flink与其它流处理技术的独特之处,要好好的利用起来。

不过Flink的savepoint使用也有诸多限制,后续有机会再讲到,但相对于Spark Streaming的checkpoint来说还是高级了不少。

PS:虽然Spark项目的star数比Flink多一个数量级,但Flink在某些feature上的开发和布局比Spark更快,感觉Flink开发者在最近代表着实时流处理和离线大数据技术的方向,看好Flink

参考:
[1] http://data-artisans.com/turning-back-time-savepoints

Enjoy!


侠天,专注于大数据、机器学习和数学相关的内容,并有个人公众号:bigdata_ny分享相关技术文章。

若发现以上文章有任何不妥,请联系我。

image