客户端不是运行时和程序执行的一部分,但它用于准备并发送 dataflow 给Master,然后,客户端断开连接或者维持连接以等待接收计算结果,客户端可以以两种方式运行:要么作为 Java/Scala 程序的一部分被程序触发执行,要么以命令行./bin/flink run 的方式执行。
当一个程序被提交后,系统创建一个 Client来进行预处理,将程序转变成一个并行数据流形式,交给Jobmanagerf和 Taskmanager执行。
JobManager 负责协调 Flink系统,调度task,协调检查点,协调失败时恢复等。
每个Flink程序都包含以下的若干流程:
DataStream → KeyedStream:输入必须是 Tuple 类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。
KeyedStream → DataStream:分组数据流上的滚动聚合操作。min 和 minBy 的区别是 min 返回的是一个最小值,而 minBy 返回的是其字段中包含最小值的元素(同样原理适用于 max 和 maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在 2.3.10 之前的算子都是可以直接作用在 Stream 上的,因为他们不是聚合类型的操作,但是到 2.3.10 后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实, reduce、fold、aggregation 这些聚合算子都是和 Window
配合使用的,只有配合Window,才能得到想要的结果。
流式计算分为有状态和无状态两种情况,所谓状态就是计算过程中的中间值。
对于无状态计算,会独立观察每个独立事件,并根据最后一个事件输出结果。什么意思?大白话举例:对于一个流式系统,接受到一系列的数字,当数字大于N则输出,这时候在此之前的数字的值、和等情况,压根不关心,只和最后这个大于N的数字相关,这就是无状态计算。
什么是有状态计算?想求过去一分钟内所有数字的和或者平均数等,这种需要保存中间结果的情况是有状态的计算。
当分布式计系统中引入状态计算时,就无可避免一致性的问题。为什么了?因为若是计算过程中出现故障,中间数据咋办了?若是不保存,那就只能重新从头计算了,不然怎么保证计算结果的正确性。这就是要求系统具有容错性了。
**Spark Streaming:**将源头数据流分成了微批,吞吐大,但数据延迟增加了。
**流计算技术:**必须对job状态进行管理,确保能从任何情況引起的 Job failurer中恢复,而且确保 exactly once可靠性,这样就会带来性能的开销,增加数据延迟和吞吐量的降低。
Flink:核心分布式数据流和状态快照(即分布式快照,是轻量的)
-
当job由于网络、集群或任何原因失败时,可以快速从这些分布式快照中恢复。
谈到容错性,就没法避免一致性这个概念。所谓一致性就是:成功处理故障并恢复之后得到的结果与没有发生任何故障是得到的结果相比,前者的正确性。换句大白话,就是故障的发生是否影响得到的结果。在流处理过程,一致性分为3个级别[1]:
-
at-most-once:至多一次。故障发生之后,计算结果可能丢失,就是无法保证结果的正确性;
-
at-least-once:至少一次。计算结果可能大于正确值,但绝不会小于正确值,就是计算程序发生故障后可能多算,但是绝不可能少算;
-
exactly-once:精确一次。系统保证发生故障后得到的计算结果的值和正确值一致;
Flink的容错机制保证了exactly-once,也可以选择at-least-once。Flink的容错机制是通过对数据流不停的做快照(snapshot)实现的。针对FLink的容错机制需要注意的是:要完全保证exactly-once,Flink的数据源系统需要有“重放”功能,具体将会在下面进行介绍。
Flink做快照的过程是基于“轻量级异步快照”的算法,其核心思想就是在计算过程中保存中间状态和在数据流中对应的位置,至于如何实现的会后续的博客中会详细说明。这些保存的信息(快照)就相当于是系统的检查点(checkpoint)(类似于window系统发生死机等问题时恢复系统到某个时间点的恢复点),做snapshot也是做一个checkpoint。在系统故障恢复时,系统会从最新的一个checkpoint开始重新计算,对应的数据源也会在对应的位置“重放“。这里的“重放”可能会导致数据的二次输出,这点的处理也在后续的博客中说明。
在Flink做分布式快照过程中核心的是Barriers元素的使用。
如果用河水举例的话,Storm就是一滴滴的对数据进行处理,SparkStreaming就是一批一批的放水,上批水放完了,才放下批水。而Flink是在水中定期插入barrier,水仍然在流动,只是增加了些barrier。如果源头是多个数据源,那么都同步的增加相同的barrier,同时在job处理的过程中,为了保证job失败的时候可以从错误中恢复,Flink还对barrier进行对齐(align)操作。
这些Barriers是在数据接入到Flink之初就注入到数据流中,并随着数据流向每个算子(operator),这里所说的算子不是指类似map()等具体意义上个的,指在JobGraph中优化后的“顶点”),这里需要说明的有两点:
-
算子对Barriers是免疫的,即Barriers是不参与计算的;
-
Barriers和数据的相对位置是保持不变的,而且Barriers之间是线性递增的;
如下图所示,Barriers将将数据流分成了一个个数据集。值得提醒的是,当barriers流经算子时,会触发与checkpoint相关的行为,保存的barriers的位置和状态(中间计算结果)。
可以打个比方,在河上有个大坝(相当于算子),接上级通知(Flink中的JobManager)要统计水流量等信息,所以有人在上游定期(source
task)放一根木头(barrier)到河中,当第一木头遇到大坝时,大坝就记下通过大坝木头的位置、水流量等相关情况,即做checkpoint(实际生活中不太可能),当第二木头遇到大坝时记下第一个木头和第二根木头之间的水流量等情况,不需要重开始计算。这里先不管故障了,不然就不好解释相同的水的“重放”问题了。
当一个算子有多个数据源时,又如何做checkpoint?
如下图,从左往右一共4副图。对于Operator接受多个数据流的情况,需要对数据流做排列对齐。
当算子收到其中一个数据源的barriers,而未收到另一个数据源的barriers时(如左1图),会将先到barriers的数据源中的数据先缓冲起来,暂停处理,等待另一个barriers(如左2图),当收到两个barriers(如左3图)即接收到全部数据源的barrier时,会做checkpoint,保存barriers位置和状态,发射缓冲中的数据,释放一个对应的barriers。这里需要注意是,当缓存中数据没有被发射完时,是不会处理后续数据的,这样是为了保证数据的有序性。
),所以会导致一个问题:当一个checkpoint所需时间远大于两次checkpoint之间的时间间隔时,就很有可能会导致后续的checkpoint会失败,若是这样情况比较严重时会导致任务失败,这样Flink系统的容错性的优势就等不到保证了,所以需要合理设计checkpoint间隔时间。
如下图所示,在一次snapshot中,算子会在接受到其数据源的所有barriers的以后snapshot它们的状态,然后在发射barriers到输出流中,直到最后所有的sink算子都完成snapshot才算完成一次snapshot。
其中,在准备发射的barriers形成之前,state 形式是可以改变的,之后就不可以了。state的存贮方式是可以配置的,如HDFS,默认是在JobManager的内存中。
这里要注意,Snapshot并不仅仅是对流做了一个状态CheckPoint,它也包含了一个Operator内部持有的状态,保证流处理系统失败时能够正确地恢复数据流处理,状态包括两种:
-
系统状态:Operator进行计算需要有缓存,所以缓冲区的状态是与Operator相关联的。eg:窗口操作,系统会手机和聚合记录数据并放到缓冲区中,直到该缓冲区中的数据被处理完成。、
上述描述中,需要等待算子接收到所有barriers后,开始做snapshot,存储对应的状态后,再进行下一次snapshot,其状态的存储是同步的,这样可能会造成因snapshot引起较大延时。可以让算子在存储快照时继续处理数据,让快照存储异步在后台运行。为此,算子必须能生成一个 state 对象,保证后续状态的修改不会改变这个
copy-on-write(写时复制)类型的数据结构,即异步状态快照。对异步状态快照,其可以让算子接受到barriers后开始在后台异步拷贝其状态,而不必等待所有的barriers的到来。一旦后台的拷贝完成,将会通知JobManager。只有当所有的sink接收到这个barriers,和所有的有状态的算子都确认完成状态的备份时,一次snapshot才算完成。如何实现的,这点后续博客将仔细分析。
数据打入kafka,然后flink消费kafka的数据,处理后直接提供给线上存储。
在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示:
**Event Time(事件时间):**是事件发生或者被创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
**Processing Time(处理时间):**是每一个执行基于时间操作的算子(operator)的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。
按时间顺序,通常事件时间早于采集时间,采集时间早于处理时间。
对于业务来说,要统计 1min 内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。
streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
-
**CountWindow:**按照指定的数据条数生成一个 Window,与时间无关。(事件窗口?)
将数据依据固定的窗口长度对数据进行切片。
**特点:**时间对齐,窗口长度固定,没有重叠。
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示:
**适用场景:**适合做 BI 统计等(做每个时间段的聚合计算)。
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
**特点:**时间对齐,窗口长度固定,有重叠。
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示:
**适用场景:**对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。
由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session
间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。
CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。
默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。
(每当有两个key一样,计算window内5个key,接着再往下计算,发现有和之前计算key一样的且能够构成对的再次计算)
TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。
Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。下面代码中的 sliding_size 设置为了 2s,也就是说,窗口每 2s 就计算一次,每一次计算的 window 范围是 5s 内的所有元素。
如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示:
我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指 Flink接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的。
那么此时出现一个问题,一旦出现乱序,如果只根据 eventTime 决定 window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark。Watermark 是一种衡量 Event Time
进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的
Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。
当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 Watermark
是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
注意,窗口是左闭右开的,形式为:[window_start_time,window_end_time)。Window 的设定无关数据本身,而是系统定义好了的,也就是说,Window 会一直按照指定的时间间隔进行划分,不论这个 Window 中有没有数据,EventTime 在这个 Window 期间的数据会进入这个 Window。
Window 会不断产生,属于这个 Window 范围的数据会被不断加入到 Window 中,所有未被触发的 Window 都会等待触发,只要 Window 还没触发,属于这个 Window范围的数据就会一直被加入到 Window 中,直到 Window 被触发才会停止数据的追加,而当 Window 触发之后才接受到的属于被触发 Window 的数据会被丢弃。
Window 会在以下的条件满足时被触发执行:
结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。
相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark,那么当触发执行时,所有满足时间间隔而还没有触发的 Window 会同时触发执行。
Flink 是一个真正意义上的流计算引擎,在满足低延迟和低容错开销的基础之上,完美的解决了exactly-once 的目标,真是由于Flink 具有诸多优点,越来越多的企业开始使用Flink作为流处理框架,逐步替换掉了原本的 Storm 和 Spark 技术框架。