0%

Flink(暂时)

是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算

lambda架构
Lambda架构

  • storm
    • 低延迟 毫秒级
    • 消息保障能力弱,消息传输可能重复但不会丢失
    • 吞吐量低
  • spark streaming
    • 以固定时间间隔(几秒钟)处理一段段的批处理作业(微批)
    • 高延迟(秒级)
    • 能够保证消息传输不会丢失也不会重复
    • 高吞吐
  • flink
    • 支持原生流处理,即数据可以一条一条的进行处理
    • 低延迟 毫秒级
    • 能够保证消息传输不会丢失也不会重复
    • 高吞吐

为什么是flink?1. 低延迟(毫秒级) 2. 高吞吐(每秒千万级) 3. 数据准确性(exactly-once) 4. 易用性(SQL/Table API/DataStream API)

\ spark streaming flink
流处理 数据需要打包成batch,这就会导致有延迟(秒级别),相当于一个伪实时 将数据全部当成流处理
数据模型 RDD,DStream实际上也是一组组小批数据RDD的集合 数据流,以及事件序列
运行时架构 批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个 标准的流执行模式,一个事件在一个节点处理完后才可以直接发往下一个节点进行处理

流处理和批处理的区别?比如同样是max(温度),spark streaming返回的是当前这一批有界数据的最大温度,而flink会返回从程序开始一直到此刻最大的温度 => 批是一种有界数据的概念,而流数据的话无界,最大最小值的话会考虑从头开始到现在的整个数据。同样,flink可以通过window api开窗,来做批处理

SQL/Table API(dynamic tables)
DataStream API(streams, windows)
ProcessFunction(events, state, time)

DataFlow 模型

  • 数据从上一个 Operation 节点直接 Push 到下一个 Operation 节点。
  • 各节点可以分布在不同的 Task 线程中运行,数据在 Operation 之间传递。
  • 同样具有 Shuffle 过程,但是数据不像 MapReduce 模型,Reduce 从 Map 端拉取数据,而是由上游把数据推给下游。
  • 实现框架有 Apache Storm 和 Apache Flink。

watermark && window && allowedLatest

  • watermark:由eventTime - 允许数据乱序的时间M秒得到,只增不减,即只有当当前数据对应的watermark大于之前的,才会更新watermark
  • window:每N秒一段时间间隔
  • allowedLatest:设置窗口销毁延迟时间,及到时间了,但是还允许一定时间内的数据迟到

形象化理解为 水位线上涨,淹没了一个窗口,就会触发窗口的计算
多个并行度的话,算子会取最小的那个水位线来生效

keyby(int… fields) // 0 代表第一个元素
keyby(String… fields) // 声明为public的字段名或类的get方法,主要是方便了datastream嵌套复合类型比如tuple或者pojo类的时候
keyby(new KeySelector<T,K>()) // 覆写getKey方法,自定义返回key分组

除了env可以设置并行度,每一个算子也都可以单独设置并行度,包括 print().setParallelism()。默认并行度是cpu核数。一般来说,一个流的并行度,可以认为是其所有算子的并行度里最大的那个并行度。

一个流所需要的slot数量不一定就是流中所有算子的并行度的加和。事实上,flink 允许先后操作的算子放入到同一个slot里面(子任务共享slot),这样可以减少数据的shuffle;在共享slot的情况下,可能会出现一个slot实现了source、transformation到sink的所有操作,即保留了整个流的pipeline过程,这个在 flink-yarn.xml 里有注释说是允许的。

flink运行时组件:

  • jobmanager
    • 拿到客户端提交的jar包,这个jar包包括:作业图(jobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库等等;把jobgraph转换成一个物理层面的数据流图–执行图(executionGraph),包含了所有可以并发执行的任务;向rm申请slot资源,并分发到taskmanager上运行
    • 还负责维护类似检查点(checkpoint)这样的操作
  • taskmanager
    • taskmanager数量 * 每个taskmanager下面对应的slot数量 = 整个集群最大的能够运行任务数量
    • 在内存中划分出一部分,称为slot,用来运行task。宏观来理解的话,可以把taskmanager想成是一个jvm进程,每一个slot是运行在上面的线程,只是对内存进行隔离,每个slot有自己的内存资源
  • resourcemanager
    • 主要是管理slot资源,每个taskmanager下面的slot都会在rm里进行注册
  • dispatcher
    • 提供rest接口,提交app的时候会自动启动,把app交给jobmanager;
    • webUI界面。并不是必需的

flink执行图可以分为四层,或者四个过程,或者四层模型

  • streamgraph:用户通过stream api编写的代码生成的最初的图,包含source、transformation、sink的一个拓扑结构
  • jobgraph:客户端在提交作业到jobmanager之前,会自动根据streamgraph进行合并优化,将符合条件的多个任务合并在一起作为一个任务,减少数据传输
  • executiongraph:jobmanager根据jobgraph来生成executiongraph,将任务拆成可并行化的过程,交给taskmanager;调度层最核心的数据结构
  • 物理执行图:taskmanager上部署task后形成的图,并不是一个具体的数据结构

数据分发策略
分发器继承图

ChannelSelector 是一个接口,为输入的数据提供一个逻辑的channel
StreamPartitioner 是所有分区器的基类

  • GlobalPartitioner:将所有的数据都发送到下游 0 号分区中
  • ShufflePartitioner:将数据随机分发到一个分区
  • RebalancePartitioner:将数据循环分发到下游
  • RescalePartitioner:基于上下游并行度分发,下游2个上游四个,那么上游的2个会分发到同一个下游分区
  • BroadcastPartitioner:广播,将数据分发到下游的所有分区
  • ForwardPartitioner:上下游分区是一一对应的
  • KeyGroupStreamPartitioner:keyby操作,相同key分发到同一个下游分区
  • CustomPartitionerWrapper:自定义分区策略

数据传输策略

  • forward strategy:一个task的输出只发送给一个task作为输入,如果两个task在同一个jvm里,可以避免网络传输
  • key based strategy:数据按key分组,相同的key数据传输到同一个task处理
  • broadcast strategy:广播
  • random strategy:数据从一个task随机传输给下一个算子的task

数据传输形式

  • one-to-one(forwarding)
    • map、filter、flatmap等算子都是one-to-one的关系,即可以直接在同一个slot上执行计算
    • 类似于spark的窄依赖
  • redistributing
    • stream的分区发生改变。每一个算子的子任务根据所选择的transformation然后发送数据到不同的分区
    • 比如keyby是基于hashcode重分区,而broadcast和rebalance会随机重新分区
    • 类似于spark的宽依赖,shuffle过程

什么样的任务可以被合并在一起?相同并行度的one-to-one操作,满足这两个条件,flink会把相邻的算子合并在一起,放在同一个slot里面进行计算,减少网络传输。通过在算子后面调用 .filter(..).disableChain() 可以断开合并的任务链,.filter(..).startNewChain() 开始合并新的任务链。

针对某一个算子过程,可能会很复杂或者有特殊需求,需要单独放在一个slot里运行?在算子后面调用 slotSharingGroup("key"),表示从当前算子开始,之后的所有操作都会放在一个slot里面,通过 key 来区分多个slot共享组。默认slot共享组的key是”default”。

DataStream API

  • source
    • env.addSource()
  • transformation
    • map
    • flatmap
    • filter
    • keyby
      • 同一个key肯定在同一个分区,但是同一个分区不一定只有一个key,2个key的数据经过hash可能被分到同一个分区里
    • 滚动聚合算子(rollingAggregation)
      • 针对KeyedStream每一个支流做聚合
      • 以min和minby为例,如果是min()只会返回指定字段的最小值,如果是minby则会返回指定字段的最小值对应的那一整个对象
      • 聚合算子(sum、max、min、maxBy、minBy)底层实现是调用 keyedStream.aggregate() 方法,只是创建的ComparableAggregator的AggregationType不一样,分别是SUM, MAX, MIN, MAXBY, MINBY。ComparableAggregator类继承了AggregationFunction,而AggregationFunction则实现了ReduceFunction接口,所以ComparableAggregator类实现了reduce方法,首先是通过Comparator来比较两个对象,然后会判断byAggregate是否为真,即是否是minby或maxby操作,如果是的话,再判断isfirst是否为真,即当出现多个同样值的时候,是返回第一个还是返回最后一个
    • reduce
      • 自定义 reduce() 方法需要继承 ReduceFunction 类
    • split和select
      • split 将 DataStream 会转换成 SplitStream,select 从一个splitStream里通过tag来获取一个或多个DataStream
      • 被遗弃,使用 sideoutput 替代
    • connect和map
      • connect 将两个DataStream合并为一个ConnectedStreams。此时数据只是放在了一个流里,数据本身和形式并不发生任何的变化
      • map、flatmap、keyby等算子实现的function,会单独作用于每一个datastream
    • union
  • sink
    • kafka
      • 初始化FlinkKafkaProducer的时候有三个构造函数,不加kafkaProducersPoolSize、加kafkaProducersPoolSize和一个带着自定义分区的函数。一般用前两个就行,如果说业务数据需要根据某种条件将数据写入到N多个topic中,可以用第三个,实现KeyedSerializationSchema类getTargetTopic()方法,参考地址
      • FlinkKafkaConsumer 消费过程:
        父类 FlinkKafkaConsumerBase
        1.initializeState():从最后一个成功的checkpoint中获取各个partition的offset到restoredState中。
        2.open():从restoredState中获取这个subTask所消费的topic的partition的起始offset,保存到subscribedPartitionsToStartOffsets中;如果这是一个第一次向topic消费的job的subTask,那么Flink根据job的并行度以及这个subTask的index均匀的分配partition给这个subTask消费。此时,partition的起始offset就由我们在上文中介绍的配置来决定。
        3.run(): 如果subscribedPartitionsToStartOffsets不为空,创建KafkaFetcher,执行其runFetchLoop()。
    • redis

窗口和水位线的参考网址1
窗口和水位线的参考网址2

Window类型

  • 时间窗口
    • 滚动时间窗口 Tumbling Window
      • 参数只有一个 window size,没有重叠;区间范围是左闭右开
    • 滑动时间窗口 Sliding Window
      • 参数有两个 window size 和 slide step,可以有重叠
    • 会话窗口
      • 设置一个timeout时间,如果一段时间没有接收到新的数据,就会生成一个新的窗口
  • 计数窗口
    • 滚动计数窗口
    • 滑动计数窗口

Window API

keyby()之后调用.window()方法,或者dataStream.windowAll()。一般是前者。也可以直接调用 .timeWindow(),传一个参数就是滚动事件窗口,传两个参数就是滑动时间窗口。如果要用到offset,那就只能用window()。

.countWindow() 底层调用的是 GlobalWindows 方法,全局窗口是把所有数据都放在一个窗口里,需要设置 trigger 触发器和 evictor 移除器,来保证窗口是什么时候触发什么情况下移除数据

org.apache.flink.streaming.api.windowing.assigners 包下面有各个窗口分配器的类,比如 TumblingEventTimeWindows、TumblingProcessingTimeWindows 等,在 window() 方法中,需要指定窗口分配器,比如 window(TumblingProcessingTimeWindows.of(windowSize, offset)),windowSize 就是窗口大小,offset是指和整点的偏移量,比如8点05到9点05,那就是偏移5分钟,Time.minutes(5)。offset的主要作用是时区

窗口的意义:把无限的数据流进行切分,得到有限的数据集进行处理
窗口函数 WindowedStream 函数

  • 增量聚合函数
    • 每条数据到来都会进行计算,保持一个简单的状态
    • ReduceFunction, AggregateFunction
  • 全量窗口函数
    -先把窗口所有数据收集起来,等到计算的时候再遍历所有的数据
    • ProcessWindowFunction

窗口相关的其他可选API

  • trigger(): 触发器,定义window什么时候关闭,触发计算并输出结果
  • evictor(): 移除器,定义移除某些数据的逻辑
  • allowedLateness(): 允许一定时间内迟到的数据也划分在上一个窗口里进行计算,这个时间是以watermark为准,不是eventTime
  • sideOutPutLateData(): 将迟到的数据放入侧输出流
  • getSideOutPut(): 获取侧输出流,在所有计算完之后,dataStream.getSideOutPut(tag).print() 输出

Window API总览

时间语义

  • eventTime: 事件创建的时间
  • ingestionTime: 数据进入Flink的时间
  • processTime: 执行操作算子的本地系统时间,与机器相关

使用 eventTime

  1. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  2. 然后和watermark一起搭配使用,来处理乱序数据

waterMark 的意义:解决乱序问题
waterMark 的传递:上游向下游传递是通过广播传递给它分区的所有下游,而下游会保存所有上游的watermark然后取最小的那个来计算。
watermark的传递

watermark 的引用

1
2
3
4
5
.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks) // 隔一段时间,周期性生成waterMark,这个周期时间在env.setStreamTimeCharacteristic() 默认值是200毫秒,也可以自定义设置
// eg:
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(maxOutOfOrderness)) 去重写extractTimestamp()方法,提取eventTime,在BoundedOutOfOrdernessTimestampExtractor类下getCurrentWatermark()方法里,会先通过eventTime-maxOutOfOrderness得到一个时间戳,会和已有的watermark比较,取最大值。maxOutOfOrderness指的是一个窗口延迟时间,maxOutOfOrderness设置的太大,窗口计算的结果就太慢,太小的话计算的结果准确性就下降了

.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) // 每来一条数据都会生成一个watermark

窗口起始时间的确定
以 TumblingEventTimeWindows 为例,有一个 assignWindows() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
// offset 默认是 0
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
// offset默认是0,+windowSize再取余,相当于没加,所以相当于 timestamp - timestam%windowSize
// 相当于取了一个windowSize的整数倍
}

flink 有状态的数据流

状态
比如聚合计算的一些结果,需要保存下来,这个就算是任务的状态。可以认为是一个本地变量,flink会进行状态管理,包括状态一致性、故障处理以及高效存储和访问

  • 算子状态 Operatior State
    • 作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态,比如最小值,同一个task下的所有数据都可以访问到,但是不同的task访问不到,因为不在同一个内存下
    • 算子状态的数据结构
      • 列表状态 list state: 将状态表示为一组数据的列表
      • 联合列表状态 union list state
      • 广播状态 broadcast state
  • 键控状态 keyed State
    • 同一个分区下,可能有不同的key,针对这些key,会保存每一个key自己的一个状态实例
    • 键控状态的数据结构
      • 值状态
      • 列表状态
      • 映射状态
      • 聚合状态
  • 状态后端 State Backends
    • 主要负责本地的状态管理,以及将检查点状态写入远程存储
    • 主要有3种
      • MemoryStateBackend
        • 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在taskmanager的jvm堆上,而将checkpoint存储在jobmanager中
        • 有点:开发测试方便
        • 缺点:但只能保存数据量小的状态;状态数据有可能丢失
      • FsStateBackend
        • 将checkpoint存到远程的持久化文件系统,而对于本地状态,跟MemoryStateBackend一样,也会存在taskmanager的jvm堆上
        • 特点:同时拥有内存级的本地访问速度,和更好的容错保证。但是如果系统特别庞大,taskmanager堆上的内存(默认是5M)都不足以存储下,就有第三种状态后端
        1
        2
        // 状态后端
        // env.setStateBackend(new FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots));
      • RocksDBStateBackend
        • 类似于key-value数据存储,将所有状态序列化后,存入本地的RocksDB中存储。相当于落入磁盘,不会丢数,但是会影响速度
        • 需要再引入 flink-statebackend-rocksdb 依赖
        • 优点:可以存储超大量的状态信息;状态信息不会丢失
        • 缺点:状态访问速度有所下降

从运行时上下文拿到状态 => 需要在 richFunction 里面去拿到上下文

比如温度监控,同一个传感器如果这一次的温度和上一次的温度相差十度,就预警。需要把上一条数据的温度不断更新在状态里,然后进行比较。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
dataStream.keyBy().flatmap(new TempAlert(温度阈值))

class TempAlert extends RichFlatMapFunction{
private int threshold = 0;
private ValueState lastTempValueState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
public TempAlert(int i) {
this.threshold = i;
}

@Override
public void flatMap(Object value, Collector out) throws Exception {
// 先获取上一条数据的温度值
double lastTemp = (double) lastTempValueState.value();
if(Math.abs(value.getTemp() - lastTemp) >= threshold){
out.collect("预警信息");
}

// 更新温度值
lastTempValueState.update(value.getTemp());
}
}

ProcessFunction
本身也继承了 AbstractRichFunction, 即实现了 RichFunction,即加强版的富函数,可以拿到各种上下文、变量、状态,还可以实现分流输出的效果

context.timeService().registerEventTimeTimer(long time) // 注册一个定时器,然后重写 onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)方法; 可以注册多个定时器,只要参数time不一样,就是不一样的定时器,但是实现的话都是 onTimer() 方法,通过判断timestamp(onTimer()方法被激活时的时间戳)的不一样,来执行不同的操作。类似于在同一个闹钟app设置定时,根据时间来区分,激活方法都是一样的,只是去判断当前timestamp是哪一个,来执行不同的操作

比如连续N秒钟,温度一直上升,就发送预警信息。如果用滚动窗口或者滑动窗口的话,有一定问题:第一个窗口前1秒是下降,后面的N-1秒是上升;第二个窗口,前面N-1秒是上升,最后1秒是下降。这样两个窗口都不会报警,但是实际上是应该报警的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
dataStream.keyBy().process(new TempWarning(连续时长))

class TempAlert2 extends KeyedProcessFunction {
// 保存上一个温度值进行比较
private ValueState lastTempValueState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));

// 保存上一个注册定时器的时间戳,用于删除
private ValueState timerTsValueState = getRuntimeContext().getState(new ValueStateDescriptor("timerTs", Long.class));


private int timeSpan = 0;
public TempAlert2(int i) {
this.timeSpan = i;
}

@Override
public void processElement(Object value, Context ctx, Collector out) throws Exception {
// 取出状态
double lastTemp = (double) lastTempValueState.value();
long timerTs = (long) timerTsValueState.value();

// 更新上一次的温度值
lastTempValueState.update(lastTemp);

// 温度上升并且没有注册过定时器,那就注册一个以当前时间戳开始的一个定时器
// timerTs 默认值是0,所以等于0的时候说明是第一次进行判断
if(value.getTemp() > lastTemp && timerTs == 0){
// 按照当前处理时间 + timeSpan 作为时间戳来注册定时器,也可以按照eventTime来设置时间戳
long ts = ctx.timerService().currentProcessingTime() + this.timeSpan*1000;
ctx.timerService().registerProcessingTimeTimer(ts);
timerTsValueState.update(ts);

}else if(value.getTemp() < lastTemp){
// 如果温度下降,那么需要删除定时器
ctx.timerService().deleteProcessingTimeTimer(timerTs);
timerTsValueState.getClass();
}
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
out.collect("传感器 " + ctx.getCurrentKey() + " 的温度在 " + this.timeSpan/1000 + "秒内连续上升");
// 清空这一次的定时器,至于温度状态是否清空,如果清空了,就相当于再重新走一遍流程
timerTsValueState.clear();
}
}

实现分流输出的时候,通过 ctx.output(OutputTag<X> outputTag, X value); 方法来实现

容错机制

一致性检查点 checkpoints: 在某个时间点对所有任务的状态进行一次快照(一个任务的状态可能很快就可以被更新,但是所有任务都做完,所有状态合并出来的快照的时间可能就会比较慢);这个时间点应该是所有任务都恰好处理完一个相同的输入数据的时候,比如针对偏移量5做快照,进行了keyby分区,那么应该保存所有分区在执行完偏移量5的数据之后的那个状态。当恢复的时候是恢复最近一次成功保存的检查点,然后会重新提交偏移量,这个就提供了 exactly-once 的一致性保证

类似于 jvm 的 safepoint

检查点的实现算法

  • 一种简单的想法
    • 暂停应用,保存状态到检查点,再重新恢复应用
  • flink 的改进和实现
    • 基于 Chandy-Lamport 算法的分布式快照
    • 将检查点的保存和数据处理分离开,不暂停整个应用,哪一个分区做完了就先做一个合照,等所有分区都做了合照之后再拼起来就可以了

检查点屏障 checkpoint barrier: 类似于watermark,在处理数据的时候会打上一个barrier,就可以把一条流上的数据按照不同的检查点分开。当前 barrier 前面到来的数据导致的状态更改,都会包含在当前 barrier 所属的检查点中;当前 barrier 后面到来的数据导致的状态更改,都会包含在后面的检查点

jobmanager 会发送一个命令,告诉source,然后source会在数据里插入一个barrier,当task执行到对应的数据时,就知道这里需要做一次检查点保存。

barrier 对齐: 类似于watermark会从上游广播到所有的下游,而下游会分区来保存所有的watermark,然后取最小值来计算。barrier也会从上游广播到所有的下游,对于下游来说,如果其中一个流source1的barrier先到,意味着这个流的数据已经计算完了,而其他流的barrier还没到,此时source1的数据如果继续到的话,会先缓存起来,要等其他流的barrier到,计算完了之后保存好状态,再从缓存的数据开始陆续计算。

如果barrier不对齐,其中快的流的数据不断计算,状态就会不断更新,慢的流快照保存,再次恢复上一次成功检查点快照的时候,快的流那边就会重复消费数据,就变成了 At Least Once

前面说的都是 flink 内部的 Exactly-OnceAt Least Once,如果每1分钟快照一次,处理数据之后提交给mysql,在chk-100成功快照一次之后,过了30秒,程序down了,恢复到chk-100时的状态,那么就会有30秒的数据会被重复提交到mysql,也就是说还需要一个端对端精确一次实现

1
2
3
4
5
6
7
8
9
10
// checkpoint 默认时间间隔是500L
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
// checkpoint 超时时间,快的流等慢的流,超过这个时间就作废
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 最大同时checkpoint个数 默认是1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(5);
// 两个checkpoint执行的最小间隔,如果配了这一个,上面的最大同时执行个数就不会生效
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
// 能够允许checkpoint失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);

env.setStateBackend(new FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots)); 在设置状态后端这里,FsStateBackend 还有第二个参数,是否异步快照,即如果为true,当快的流计算完了,会把自己的状态先缓存到文件里,然后继续执行下面的计算,当慢的流计算完了之后再去合并快照

重启策略

1
2
// 重启3次,中间要间隔10秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));

保存点 SavePoint
类似于checkpoint的实现,savepoint是自定义设置的保存功能,需要写出来触发创建操作,会同时保存一些额外的元数据上下文的信息
作用:有计划的手动备份;暂停和重启应用;版本迁移等等
注意:要恢复savepoint的话需要保证计算流的拓扑结构是一样的,也就是算子应该是不变的,最好在算子后面添加 .uid(String),这样在恢复的时候可以更有针对性

状态一致性
每个算子任务都有自己的计算状态,一条数据不应该丢失,也不应该重复计算(重复计算指的是不能叠加计算)

状态一致性分类

  • at-most-once: 任务故障时,什么也不做,丢数就丢数。这样的话会没有快照的开销,速度会变快,准确性会下降。比如直播视频,因为网络问题,丢帧也是可以接受的,udp协议
  • at-least-once: 数据不会丢,数据可能被处理多次,即计算结果可能会进行叠加计算
  • exactly-onde: 数据不会丢,只会处理一次

端到端的 exactly-once

  • 内部保证:checkpoint
  • source:可重设数据的读取位置
  • sink:从故障恢复时,数据不会重复写入到外部系统
    • 幂等写入
    • 事务写入

幂等写入 Idempotent Writes
思路:e的导数还是e。
含义:一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行的操作就不起作用了。
实现:类似于hashmap,数据修改是针对于同一个key的,修改再多次也只相当于是一次。比如redis、mysql提交的时候,按照key来写入数据,那重复写入的话也不会影响到数据的变化,相当于是一次更新
缺点:1->5->10->1->5->10 在第一个10的位置发生故障,导致数据重复发送,产生数据跳变

事务写入 Transactional Writes
思路:事务对应这checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果放入到sink端中
实现:1. 预写日志 2. 两阶段提交

预写日志 Write-Ahead-Log WAL
实现:把结果数据先当成状态保存,然后在收到checkpoint完成的通知后,一次性批量写入sink端。DataStream API提供了一个模板类 GenericWriteAheadSink 来实现
缺点:由于checkpoint不能设置的太小(间隔太小的话,整个流就不断在做快照了,都没时间处理数据了),所以这一批数据到sink端会需要一定的时间,延迟性会比较高;另外从日志中批量写入到sink端时,如果写到一半sink端故障了,恢复的时候针对另一半没写入的日志数据如何处理也是一个问题

两阶段提交 Two-Phase-Commit 2PC
实现:对于每一个checkpoint,sink端会启动一个事务,将所有计算得到的数据都放入到事务里,然后写入到外部系统,但是并不提交,只是预提交(此时如果checkpoint挂了可以回滚事务)。当收到checkpoint完成的通知后,再提交事务,数据就会真正写入。Flink提供了 TwoPhaseCommitSinkFunction 抽象类,eg: kafkaProducer
要求:对外部 sink 端的要求会比较高————需要支持事务;支持预写入;可以回滚;提交事务必须是幂等操作

也许状态很多,需要等所有状态都合并成快照之后才能提交事务。而不是看到下一个barrier的时候就提交。看到新的barrier会继续新的计算,放在新的事务里,当checkpoint完成之后,才会提交上一个事务

sink\source 不可重置 可重置
任意 At-most-once At-least-once(故障恢复时会出现暂时的不一致,数据跳变)
幂等 At-most-once Exactly-once
预写日志 At-most-once At-least-once
两阶段提交 At-most-once Exactly-once

Flink + Kafka 端到端状态一致性的保证

  • 内部:利用checkpoint机制,把状态存盘,发生故障的时候可以恢复
  • source:kafka Consumer 作为source,可以把偏移量保存下来,故障恢复时可以重置偏移量,重新消费数据
  • sink:FlinkKafkaProducer 底层继承了 TwoPhaseCommitSinkFunction 类

综上:

  1. jobmanager在source数据流插入barrier
  2. task看到barrier,就开始保存自己的状态,把数据写入到sink的事务里
  3. sink看到barrier,创建新的事务,当上一个barrier完成的时候,提交上一个事务

Table API 和 Flink SQL

需要引入 flink-table-planner 依赖,会自动引入 bridge 依赖。可以引入 flink-table-planner-blink 版本,比flink-table-planner要更完善一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建table环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 从外部链接创建一张表
①tableEnv.connect().createTemporaryTable("myTable1");
// 从已有的数据流生成一张表
②Table dataTable = tableEnv.fromDataStream(dataStream);

// 基于 Table API 查询算子得到一张表 2种形式
①Table resultTable1 = tableEnv.from("myTable1").select().filter()
②Table resultTable1 = dataTable.select("id, temp").filter("id = \"sensor_1\"");

// 基于SQL来查询得到一张表
②tableEnv.createTemporaryView("myTable2", dataTable);
②Table resultTable2 = tableEnv.sqlQuery("select * from myTable2"); // 表名和创建的那个view的名字需要是一样的

①Table resultTable2 = tableEnv.sqlQuery("select * from myTable1"); // 因为前面指定了表名,所以这里直接写就可以了

// 输出最后的结果
DataStream<Row> rowDataStream = tableEnv.toAppendStream(resultTable, Types.ROW(Types.INT, Types.LONG));
rowDataStream.print();