流式框架Flink(二)
Apache Flink是一个用于对无边界和有边界数据流进行有状态计算的框架。
我们来谈谈 Flink 的编程模型与部署。
第一部分:Flink 编程模型
Flink 分层架构
Flink提供不同级别的抽象来开发流/批处理应用程序。
Stateful Stream Processing
- 位于最底层, 只提供有状态流,是 core API 的底层实现。
- 通过 Process Function 嵌入到 DataStream API中。
- 利用低阶,构建一些新的组件(比如:利用其定时做一定情况下的匹配和缓存)。
- 灵活性高,但开发比较复杂。
Core APIs(DataStream API/DataSet API)
在实践中,大多数应用程序不需要上述低级抽象,而是针对Core API 编程,如 DataStream API (有界/无界流)和 DataSet API(有界数据集)。这些流畅的API提供了用于数据处理的通用构建块,例如各种形式的用户指定的转换,连接,聚合,窗口,状态等。在这些 API 中处理的数据类型在相应的编程语言中表示为类。
- DataStream:流处理
- DataSet:批处理
Table & SQL
- SQL 构建在 Table 之上,都需要构建 Table 环境。
- 不同的类型的 Table 构建不同的 Table 环境。
- Table 可以与 DataStream 或者 DataSet 进行相互转换。
- Streaming SQL 不同于存储的 SQL, 最终会转化为流式执行计划。
Flink 构建的流程
- 构建计算环境(决定采用哪种计算执行方式)
- 创建Source(可以多个数据源)
- 对数据进行不同方式的转换(提供了丰富的算子)
- 对结果的数据进行Sink(可以输出到多个地方)
Flink程序的基本构建块是流和转换。(请注意,Flink的 DataSet API 中使用的 DataSet 也是内部流)从概念上讲,流是(可能永无止境的)数据记录流,而转换是将一个或多个流作为一个或多个流的操作。输入,并产生一个或多个输出流。
执行时,Flink程序映射到流数据流,由流和转换运算符组成。每个数据流都以一个或多个源开头,并以一个或多个接收器结束。数据流类似于任意有向无环图 (DAG)。尽管通过迭代结构允许特殊形式的循环 ,但为了简单起见,我们将在大多数情况下对其进行掩饰。
通常,程序中的转换与数据流中的运算符之间存在一对一的对应关系。但是,有时一个转换可能包含多个转换运算符。
并行数据流
Flink中的程序本质上是并行和分布式的。在执行期间,流具有一个或多个流分区,并且每个运算符具有一个或多个运算符子任务。运算符子任务彼此独立,并且可以在不同的线程中执行,并且可能在不同的机器或容器上执行。
运算符子任务的数量是该特定运算符的并行度。流的并行性始终是其生成运算符的并行性。同一程序的不同运算符可能具有不同的并行级别。
流可以以一对一(或转发)模式或以重新分发模式在两个运营商之间传输数据:
- 一对一流(例如,在上图中的Source和map()运算符之间)保留元素的分区和排序。这意味着map()运算符的subtask [1] 将看到与Source运算符的subtask [1]生成的顺序相同的元素。
- 重新分配流(在上面的map()和keyBy / window之间,以及 keyBy / window和Sink之间)重新分配流。每个运算符子任务将数据发送到不同的目标子任务,具体取决于所选的转换。实例是 keyBy() (其通过散列密钥重新分区),广播() ,或重新平衡() (其重新分区随机地)。在重新分配交换中,元素之间的排序仅保留在每对发送和接收子任务中(例如,map()的子任务[1] 和子任务[2]keyBy / window)。因此,在此示例中,保留了每个密钥内的排序,但并行性确实引入了关于不同密钥的聚合结果到达接收器的顺序的非确定性。
window
什么是 window
聚合事件(例如,计数,总和)在流上的工作方式与批处理方式不同。例如,不可能计算流中的所有元素,因为流通常是无限的(无界)。相反,流上的聚合(计数,总和等)由window限定,例如*“在最后5分钟内计数”* 或*“最后100个元素的总和”*。
Windows可以是时间驱动的(例如:每30秒)或数据驱动(例如:每100个元素)。人们通常区分不同类型的窗口,例如翻滚窗口(没有重叠), 滑动窗口(具有重叠)和会话窗口(由不活动间隙打断)。
Window 类型
- Count Window
- Time Window
- 自定义window
Window 聚合日常会遇到的问题(数据过热,延迟数据丢弃, 反压等问题)
Time
当在流程序中引用时间(例如定义窗口)时,可以参考不同的时间概念:
- Event Time:事件时间是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感器或生产服务附加。Flink通过时间戳分配器访问事件时间戳。
- Ingestion Time:是事件在源操作员处输入Flink数据流的时间。
- Processing Time:是执行基于时间的操作的每个操作员的本地时间。
例如:一条日志进入 Flink 的时间为2017-11-12 10:00:00,123, 到达 window 的系统时间为 2017-11-12 10:00:01,234. 日志的内容如下:
2017-11-02 18:37:15,624 INFO
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider -
Failing over to rm2
State
虽然数据流中的许多操作只是一次查看一个单独的事件(例如事件解析器),但某些操作会记住多个事件(例如窗口操作符)的信息。这些操作称为有状态。
状态操作的状态保持在可以被认为是嵌入式键/值存储的状态中。状态被分区并严格地与有状态运营商读取的流一起分发。因此,只有在keyBy()函数之后才能在键控流上访问键/值状态,并且限制为与当前事件的键相关联的值。对齐流和状态的密钥可确保所有状态更新都是本地操作,从而保证一致性而无需事务开销。此对齐还允许Flink重新分配状态并透明地调整流分区。
checkpoint
Flink 使用流重放和检查点的组合实现容错。检查点与每个输入流中的特定点以及每个操作符的对应状态相关。通过恢复运算符的状态并从检查点重放事件,可以从检查点恢复流数据流,同时保持一致性*(恰好一次处理语义)*。
检查点间隔是在执行期间用恢复时间(需要重放的事件的数量)来折衷容错开销的手段。
轻量级容错机制(全局异步,局部同步)
-
保证exactly-once 语义
-
用于内部失败的恢复
-
基本原理
1、通过往 source 注入 barrier
2、barrier 作为 checkpoint 的标志
流处理过程中的状态历史版本
- 具有可以replay的功能
- 外部恢复(应用重启和升级)
- 两种方式触发
1、Cancel with savepoint
2、手动主动触发
Batch on Streaming
Flink 执行批处理程序作为流程序的特殊情况,其中流是有界的(有限数量的元素)。甲数据集在内部视为数据流。因此,上述概念以相同的方式应用于批处理程序,并且它们适用于流程序,除了少数例外:
- 批处理程序的容错不使用检查点。通过完全重放流来进行恢复。这是可能的,因为输入有限。这会使成本更多地用于恢复,但使常规处理更便宜,因为它避免了检查点。
- DataSet API 中的有状态操作使用简化的内存/核外数据结构,而不是键/值索引。
- DataSet API 引入了特殊的同步(超级步骤)迭代,这些迭代只能在有界流上进行。
第二部分:Flink 运行时
运行时架构
Client
JobManager
TaskManger
角色间的通信(Akka)
数据的传输(Netty)
Flink运行时包含两种类型的进程:
-
该 JobManagers(也称为Masters)协调分布式执行。他们安排任务,协调检查点,协调故障恢复等。
总是至少有一个Job Manager。高可用性设置将具有多个JobManagers,其中一个始终是领导者,其他人处于待机状态。
-
该 TaskManagers(也叫workers)执行任务(或者更具体地说,子任务)的数据流,以及缓冲器和交换数据流。
必须始终至少有一个 TaskManager。
JobManagers和TaskManagers可以通过多种方式启动:作为独立集群直接在计算机上,在容器中,或由 YARN 或Mesos 等资源框架管理。TaskManagers 连接到 JobManagers,宣布自己可用,并被分配工作。
该客户端是不运行时和程序执行的一部分,而是被用来准备和发送的数据流的 JobManager。之后,客户端可以断开连接或保持连接以接收进度报告。客户端既可以作为触发执行的 Java / Scala 程序的一部分运行,也可以在命令行进程中运行./bin/flink run ...
。
Task Slots and Resources
每个worker(TaskManager)都是一个JVM进程,可以在不同的线程中执行一个或多个子任务。为了控制worker接受的任务数量,worker有所谓的任务槽(至少一个)。
每个任务槽代表TaskManager的固定资源子集。例如,具有三个插槽的TaskManager将其1/3的托管内存专用于每个插槽。切换资源意味着子任务不会与来自其他作业的子任务竞争托管内存,而是具有一定量的保留托管内存。请注意,这里没有CPU隔离; 当前插槽只分离任务的托管内存。
通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个TaskManager有一个插槽意味着每个任务组在一个单独的JVM中运行(例如,可以在一个单独的容器中启动)。拥有多个插槽意味着更多子任务共享同一个JVM。同一JVM中的任务共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每任务开销。
默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可以保存作业的整个管道。允许此插槽共享有两个主要好处:
- Flink集群需要与作业中使用的最高并行度一样多的任务槽。无需计算程序总共包含多少任务(具有不同的并行性)。
- 更容易获得更好的资源利用率。没有插槽共享,非密集 源/ map()子任务将阻止与资源密集型窗口子任务一样多的资源。通过插槽共享,将示例中的基本并行性从2增加到6可以充分利用时隙资源,同时确保繁重的子任务在TaskManagers之间公平分配。
CoLocationGroup
- 保证所有的i-th 的sub-tasks 在同一个slots
- 主要用于迭代流
SlotSharingGroup
- 保证同一个group的i-th 的sub-tasks 共享同一个slots
- 算子的默认 group 为 default
- 怎么确定一个算子的 SlotSharingGroup(根据input的group 和自身是否设置group共同确定)
- 适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载
Slots & parallelism
一个应用需要多少Slots
- 不设置SlotSharingGroup(应用的最大并行度)
- 设置SlotSharingGroup (所有SlotSharingGroup中的最大并行度之和)
Operator Chains and Task
对于分布式执行,Flink 链接 operator 和子任务一起放入任务。每个任务由一个线程执行。将 operators 链接到任务是一项有用的优化:它可以减少线程到线程切换和缓冲的开销,并在降低延迟的同时提高整体吞吐量。可以配置链接行为。
下图中的示例数据流由五个子任务执行,因此具有五个并行线程。
OperatorChain
OperatorChain的优点
- 减少线程切换
- 减少序列化与反序列化
- 减少延迟并且提高吞吐能力
OperatorChain 组成条件
- 没有禁用Chain
- 上下游算子并行度一致
- 下游算子的出度为1
- 上下游算子在同一个 slot group
- 上下游算子之间没有数据 shuffle
第三部分:Flink on yarn原理,部署与生产
Flink 部署方式
- Local
- Standalone
- Yarn
- Mesos
- Docker
- Kubernetes
- AWS
Flink On Yarn
- ResourceManager
- NodeManager
- AppMaster( jobmanger 运行在其上)
- Container(taskamanager 运行在其上)
- YarnSession
选择on-Yarn 的理由
- 提高机器的利用率
- Hadoop 开源活跃,且成熟
Flink job启动方式
提交 job(submitJob) 方式, 由开发者提交应用时候触发。
恢复 job 方式(recoverJob),由 Flink 系统内部触发,在 JobManager crash 情况下触发。
理解Job 的启动过程
- Graph 不同阶段的转换
- Scheduler
- Blob 服务
- HA 保证
Graph
- StreamGraph
- JobGraph
- ExecuteGraph
- 物理执行图
HA服务
- 选举leader
- 持久化checkpoint 元数据
- 持久化最近成功的checkpoint
- 持久化提交的JobGraph(SubmittedJobGraph)
- 持久化BlobStore(用于保存应用的Jar)
Flink On Yarn 部署
- Zookeeper (HA)
- HDFS(checkpoint)
- YARN(资源调度)
- Flink
Flink-conf 配置
- Akka 方面配置
- Checkpoint 方面配置
- HA配置
- 内存配置
- MetricReporter
- Yarn方面的配置
YarnSession 启动命令
- -n (container) taskmanager的数量, 怎么确定?
- -s (slot) 每个taskmanager的slot 数量,默认一个slot 对应一个vcore
- -jm jobmanager的内存大小, 怎么设置
- -tm 每个taskmanager的内存大小, 怎么设置
- -qu Yarn的队列名字
- -nm Yarn的应用名字
- -d 后台执行
./yarn-session.sh –n 2 -s 10 –jm 2048 –tm 10240 –qu root.default –nm test -d |
应用启动命令
- -j 应用的jar包
- -a 应用的运行参数
- -c 应用的主类
- -p 应用的并行度
- -yid yarnSession对应的appId
- -nm 应用名字, 在flink-ui 显示
- -d 后台执行
./flink run –j test.jar –a “test” –p 20 –yid appId –nm flink-test -d |
Flink On Yarn 部署总结
- 一个 YarnSession 一个应用,方便管理, 减少没必要应用之间的干扰。
- Flink 提交平台化,支持 HDFS 的 Jar 包提交。
- Flink On Yarn 的日志滚动, 以及改善 Flink UI 查看日志。