Flink

概述

流处理计算演进

Apache Storm项目是流处理的先锋,提供了低延迟的流处理,但是它为实时性付出了一些代价:很难实现高吞吐,并且其正确性没有达到通常所需的水平,即它并不能保证 EXACTLY-ONCE 语意,即便是其能够达到的正确性级别,也需要较大的开销来保证。

低延迟和高吞吐的流处理系统中维持良好的容错性是非常困难的,但是为了得到有保障的准确状态,人们想到了一种替代方法:将连续时间中的流数据分割成一系列微小的批量作业。如果分割得足够小(即所谓的微批处理作业),计算就几乎可以实现真正的流处理。因为存在延迟,所以不可能做到完全实时,但是每个简单的应用程序都可以实现仅有几秒甚至几亚秒的延迟。这就是在 Spark 批处理引擎上运行的 Spark Streaming 所使用的方法。 Spark Streaming虽然吞吐量高,但是由于是微批处理,所以具有较高的延迟

更重要的是,使用微批处理方法,可以实现 exactly-once 语义,从而保障状态的一致性。如果一个微批处理失败了,它可以重新运行,这比连续的流处理方法更容易。Storm Trident 是对 Storm 的延伸,它的底层流处理引擎就是基于微批处理方法来进行计算的,从而实现了 exactly-once 语义,但是在延迟性方面付出了很大的代价。

对于 Storm Trident 以及 Spark Streaming 等微批处理策略,只能根据批量作业时间的倍数进行分割,无法根据实际情况分割事件数据,并且,对于一些对延迟比较敏感的作业,往往需要开发者在写业务代码时花费大量精力来提升性能。这些灵活性和表现力方面的缺陷,使得这些微批处理策略开发速度变慢,运维成本变高。

于是,Flink 出现了,这一技术框架可以避免上述弊端,并且拥有所需的诸多功能,还能按照连续事件高效地处理数据,Flink 的部分特性如下图所示:

Flink的部分特性

有状态流计算架构

Flink简介

Flink 起源于 Stratosphere 项目,Stratosphere 是在 2010~2014 年由 3 所地处柏林的大学和欧洲的一些其他的大学共同进行的研究项目,2014 年 4 月 Stratosphere 的代码被复制并捐赠给了 Apache 软件基金会,参加这个孵化项目的初始成员是Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的顶级项目。

Flink 主页在其顶部展示了该项目的理念:“Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架”。
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

目前,国内外的一些知名互联网大厂都已经在生产汇总大规模的使用Flink,如Alibaba(启动的Blink项目,扩展、优化和完善了Flink)、Uber、Netflix、爱立信 等。

Flink特性

  • 高吞吐、低延迟、高性能
  • 支持带事件时间的窗口(window)操作:time、count、session、data-driven
  • 支持有状态计算的exactly once语义
  • 支持具有反压功能的持续流模型
  • 支持基于轻量级分布式快照(snapshot)实现的容错
  • 同时支持batch on streaming处理和streaming处理
  • Flink在JVM内部实现了自己的内存管理
  • 支持迭代计算
  • 支持程序自动优化:避免特定情况下shuffle、排序等昂贵操作,中间结果有必要时缓存

反压机制

反压功能:通常是由于某段时间内源头数据量的暴涨,导致流任务处理数据的速度远远小于数据产生的速度。

导致问题:这种情况会导致流任务的内存越积越大,可能导致资源耗尽甚至系统崩溃。

当数据高峰期时,由于数据量太大,流式处理能力跟不上,此时采取反压机制,先缓存一部分数据量,降低流式处理的数据量,避免造成过高的负载而导致集群出问题。

不同流计算引擎,处理方式不同:

  • Storm: 通过监控process bolt中接收队列负载情况来处理反压,即当超过高水位值,就将反压信息写到Zookeeper,由zookeeper的watch通知worker进入反压状态,最后spout 停止发送tuple,从源头上实现反压。
  • Spark Streaming:设置属性spark.streaming.backpressure.enabled进行自动反压,即动态控制数据接收速率来适配集群数据处理能力。
  • Flink:不需要设置,自动处理反压,即每个组件都有对应的分布式阻塞队列,只有队列不满的情况,上游才发数据,较慢的接收者会自动降低发送速率,如果队列满了(有界队列),发送者会阻塞。

SparkStreaming中的receiver机制,借助Kafka缓存消息实现反压的功能。

SparkStreaming反压

Dataset:对静态数据进行批处理操作、将静态数据抽象成分布式数据集,使用Flink各种操作符处理数据集,支持Java、Scla、 Python

Datastream:对数据流进行流处理操作,将流式的数据抽象成分布式数据流,用Flink各种操作符处理数据流,支持Java、 Scala

Table API:通过类SQL的DSL对关系表进行各种查询操作,支持Java、Scala

关键概念

  • 数据集

    • 无界数据集:持续不断,不停流入数据(交易日志、网站点击日志)
    • 有界数据集:批次的,类似 Mapreduce处理的数据集
  • 数据处理模型

    • 流处理:实时任务,任务一直运行,处理无界数据
    • 批处理:批处理任务,处理有界数据,任务运行完释放资源

Flink:将有界数据集当做无界数据集的一种特
Spark Streaming:把无界数据集分割成有界,通过微批的方式对待流计算

Flink核心计算框架

同Spark一样,Flink也有Flink Core(Runtime层)来统一支持流处理和批处理。Flink Core(Flink Runtime)是一个分布式的流处理引擎(分布式的系统),提供了支持Flink计算的全部核心实现,能够接受数据流程序并在一台或多台机器上以容错方式执行。分布式流式数据就位于Runtime层,Runtime通过将上层DataStream API和DataSet API编写的代码逻辑映射成的JobGraph映射成ExecutionGraph进行调度,为上层API提供基础服务。(jobGraph映射成executionGraph和SparkStreaming中DStream映射成RDD类似)

Flink Runtime 执行引擎可以作为 YARN(Yet Another Resource Negotiator)的应用程序在集群上运行,也可以在Mesos 集群上运行,还可以在单机上运行(这对于调试 Flink 应用程序来说非常有用)。

Flink 分别提供了面向流式处理的接口(DataStream API)和面向批处理的接口(DataSet API)。因此,Flink既可以完成流处理,也可以完成批处理。Flink 支持的拓展库涉及机器学习(FlinkML:提供机器学习的Pipelines API并实现多种机器学习算法—python scikit-learn)、复杂事件处理(CEP)、以及图计算(Gelly:提供了图计算相关API和多种图计算算法实现),还有分别针对流处理和批处理的 Table API。

能被 Flink Runtime 执行引擎接受的程序很强大,但是这样的程序有着冗长的代码,编写起来也很费力,基于这个原因,Flink 提供了封装在 Runtime 执行引擎之上的 API ,以帮助用户方便地生成流式计算程序。Flink 提供了用于流处理的DataStream API 和用于批处理的 DataSet API。这里要注意的是,Flink DataSet API是先一步于DataStream API开发出来的,因为当时工业界对于流式处理的需求在Flink初诞时期并不大。

DataStream API 可以流畅地分析无限数据流,并且可以用 Java 或者 Scala 来实现。开发人员需要基于一个叫 DataStream 的数据结构来开发,这个数据结构用于表示永不停止的分布式数据流。

Flink 的分布式特点体现在它能够在成百上千台机器上运行,它将大型的计算任务分成许多小的部分,每个机器执行一部分。Flink 能够自动地确保发生机器故障或者其他错误时计算能够持续进行,或者在修复 bug 或进行版本升级后有计划地再执行一次。这种能力使得开发人员不需要担心运行失败。Flink 本质上使用容错性数据流,这使得开发人员可以分析持续生成且永远不结束的数据(即流处理)。

Flink核心计算框架

Flink任务执行核心

1.Runtime层以JobGraph的形式接收程序, JobGraph即一个一般化的并行数据流图(dataflow),它拥有任意数量的Task来接收和产生data stream。

2.DataStream API和DataSet API都会使用单独编译的处理方式生产JobGraph。DataSet API使用optimizer来决定针对程序的优化方法,而DataStream API则使用stream buider来完成任务。

3.在执行JobGraph的时候,Flink提供了多种候选部署方案(如local、remote、YARN等)。

4.Flink附随了一些DataSet或DataStream API程序的类库和API:处理逻辑表查询的Table,机器学习的FlinkML,图像处理的Gelly,复杂时间处理的CEP。

补充概念:

OLAP和OLTP

OLTP:注重数据一条一条进行处理(增删改查),比如线上的mysql和oracle,适用线上业务,注重数据的快速在线处理;

OLAP:A<->analysis,解决分析相关问题, 注重存储的分析,比如涉及到Hive、spark sql,离线处理,延时较高,吞吐量高,解决分析类需求;

Flink基本架构

JobManager与TaskManager

Flink 启动时包含了两种类型的处理器:

JobManager 处理器

也称之为 Master,用于协调分布式执行,它们用来调度 task,协调检查点,协调失败时恢复等。Flink 运行时至少存在一个 master 处理器,如果配置高可用模式则会存在多个 master 处理器,它们其中有一个是 leader,而其他的都是 standby。

TaskManager 处理器

也称之为 Worker,用于执行一个 dataflow 的 task(或者特殊的 subtask)、数据缓冲和 data stream 的交换,Flink 运行时至少会存在一个 worker处理器。

JobManager与TaskManager

Master 和 Worker 处理器可以直接在物理机上启动,或者通过像 YARN 这样的资源调度框架。Worker 连接到 Master,告知自身的可用性进而获得任务分配。

有界数据流与无界数据流

Flink可用于处理有界数据流和无界数据流。

无界和有界数据流

无界数据流

无界数据流有一个开始但是没有结束,它们不会在生成时终止(即一直在流动)并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理 event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取 event,以便能够推断结果完整性,无界流的处理称为流处理。

有界数据流

有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理。

Flink数据流处理总结

在无界数据流和有界数据流中,他们分别对应大数据处理系统中常见的流处理和批处理两种数据处理方式。

批处理的特点是有界、持久、大量,批处理非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。流处理的特点是无界、实时,流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在 Spark 生态体系中,对于批处理和流处理采用了不同的技术框架,批处理由SparkSQL 实现,流处理由 Spark Streaming 实现,这也是大部分框架采用的策略,使用独立的处理器实现批处理和流处理,而 Flink 可以同时实现批处理和流处理。

Flink通过将批处理(即有限的静态数据)看作一种特殊的流处理。

Apache Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个 Flink 运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们要实现的目标是完全不相同的:流处理一般需要支持低延迟、 Exactly-once 保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。例如,实现批处理的开源方案有 MapReduce、Tez、Crunch、Spark,实现流处理的开源方案有 Samza、Storm。

Flink 在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。基于同一个 Flink 运行时(Flink Runtime),分别提供了流处理和批处理 API,而这两种 API 也是实现上层面向流处理、批处理类型应用框架的基础。

Flink数据处理组件

Flink对于流式数据处理三大组件有:

  • Source(数据输入)
  • Transformation(数据处理)
    • 其中transformation组件最为重要,定位是逻辑处理。
  • Sink(数据输出)

Streaming DataFlow

Flink程序实际执行时,映射到流数据流(Streaming dataflow),由流和转化符(transformation)构成。Stream是Flink数据处理的一个抽象(?),类似RDD,也是一种数据集合。Stream可以从Source中来,也可以从别的Transformation转化而来。Flink的Transformation算子和Spark的Transformation算子基本相似。

Flink Stream & Spark RDD

在Flink的API中,group by是对batch, keyby是针对stream。

Flink中的程序本质上是并行的(parallel)和分布式的(distributed)。在程序执行期间,一个流(stream)有一个或多个流分区(stream partitions),每个操作(operation)有一个或多个操作子任务(operation subtasks)。操作子任务相互独立,在不同的线程或者可能在不同的机器或容器上执行。

任务的划分1

任务的划分:在一个job的执行计划(数据流图)中,从source到计算到sink,每当并行度发生变化或者数据需要分组(keyBy)时(还可通过API明确设置),就会产生任务。

一个操作的并行度(parallelism)指的是操作子任务的数量,流的并行度是产生该流的操作符的并行度。同一个程序的不同操作可能具有不同的并行度。

例子1

举个例子,比如说对Source进行map操作,由于Flink是默认并行的,所以会启动多个线程同时执行map操作,假设启动了4个线程,那么这个map任务的并行度就是4,有4个子的map任务。即一个任务的并行度是N,那么该任务的子任务就有N个。

例子2

再比如,有一个任务,先是对Source(并行度2)进来的数据进行map(并行度4),之后再就行keyBy(并行度4),按key做hash分组后,进行map(并行度4),最后sink(4个并行度)输出。那么这个任务的划分则是:

Source和map的并行度不一样,因此Source是一个任务,map是一个任务。之后的keyBy是一个分组算子,所以又是一个任务。而keyBy后的map和sink是分组后操作且并行度都是4未改变,所以属于同一个任务。

总结来说,上面的案例一共有4个任务。

流的重分布

数据流是如何在两个transformation组件中传输的?

数据流在transformation组件的传输主要有一对一和重分布两种形式。

一对一流(相当于spark窄依赖)

比如source=>map过程,其中的元素分区和排序是保持不变的。

重分布流(相当于spark宽依赖)

比如map=>keyBy/window之间以及keyBy/window与Sink之间,改变了流的分区,每一个算子任务根据所选的转换,向不同的目标子任务发送数据。

比如: keyby,根据key的hash值重新分区、 broadcast、 rebalance(类似 shuffler过程)。

此外,Flink在处理数据时,数据不一定一致:即在一次redistributing交换中,元素间排序,只针对发送方的partition和接收partition方。最终到sink时候,sink内部虽然是有序的,但是输出后显示结果的排序是不确定的。即使不通过sink,直接transformation后输出,由于不同transformation处理时间不同,导致输出顺序也是不同的。

数据流编程模型

Flink提供了不同级别的抽象API,用以开发流式或批处理作业,如下图所示。

Flink抽象级别

80%的业务问题我们可以通过顶层API—SQL来解决。对于最底层的抽象API的开发则应该能够解决业务实际中的所有问题。

Flink底层抽象——Statefule Stream Processing

是最低级别(底层)的抽象,只提供有状态的流。它通过ProcessFunction嵌入到DataStream API之中。它使得用户可以自由处理来源于一个或者多个流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。

提供了有状态的流:通过过程函数(Process Function)嵌入到DataStream API中。

允许用户可以自由地处理来自一个或多个数据流的时间,并使用一致,容错的状态。(checkpoint)

用户可以注册时间和处理时间回调(callback),从而使程序可以实现复杂的计算。

大多数应用并不需要上述的底层抽象,而是针对核心 API(Core APIs) 进行编程,比如 DataStream API(有界或无界流数据)以及 DataSet API(有界数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。这些 API处理的数据类型以类(classes)的形式由各自的编程语言所表示。

Table API

Table API是以表为中心的声名式API,类SQL范式。其中表可能会动态变化(在表达流数据时)。

Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、project、join、group-by、aggregate等。

Table API 程序声明式地定义了什么逻辑操作应该执行,而不是准确地确定这些操作代码的看上去如何 。 尽管 Table API 可以通过多种类型的用户自定义函数(UDF)进行扩展,其仍不如核心 API 更具表达能力,但是使用起来却更加简洁(代码量更少)。

除此之外,Table API 程序在执行之前会经过内置优化器进行优化。

允许在表与 DataStream/DataSet API之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。类似spark sql和rdd的混用。

SQL层

Flink 提供的最高层级的抽象是 SQL 。

这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

Flink运行架构

Yarn模式任务提交流程

  1. 客户端提交Flink任务后,Client向HDFS提交上传Jar包和配置,之后向Yarn ResourceManager提交任务;
  2. ResourceManager接受请求后分配container资源,跟集群中对应NodeManager通信启动一个container作为Flink任务的ApplicaitonMaster;
  3. AM启动后,加载任务所需的jar包和配置构建环境,启动 JobManager进程,之后AM向RM申请资源,启动TaskManager;
  4. RM分配container资源后,由AM通知资源所在节点的NodeManager启动TaskManager(任务运行资源计算是在RM中完成的);
  5. NodeManager加载Flink’的jar包,配置构建环境后启动TaskManager;
  6. TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务,运行任务;

这里的taskmanager,在flink的standalone模式中是一个worker,在各个从节点都有启动,而在yarn集群中则是一个进程,因为在flink中的slot只对内存进行隔离,cpu共享。而在yarn中,将cpu和内存封装到了container,所以会先启动taskmanager占住cpu,之后启动多个slot进行分配。

问题: worker和进程区别?

TaskManager 与 Slot

每一个 TaskManager 是一个 JVM 进程,它可能会在独立的线程上执行一个或多个 subtask。为了控制一个 worker 能接收多少个 task,worker 通过 task slot 来进行控制(一个 worker 至少有一个 task slot)。一个task对应着一个线程,task slot可以理解为线程池。这里的slot只对内存进行了隔离管理,而CPU是共享的。

每个 task slot 表示 TaskManager 拥有资源的一个固定大小的子集。假如一个TaskManager 有三个 slot,那么它会将其管理的内存分成三份给各个 slot。

资源 slot化意味着一个 subtask 将不需要跟来自其他 job 的 subtask 竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到 CPU 的隔离,slot 目前仅仅用来隔离 task 的受管理的内存。

通过调整 task slot 的数量,允许用户定义 subtask 之间如何互相隔离。如果一个 TaskManager 一个 slot,那将意味着每个 task group 运行在独立的 JVM 中(该 JVM可能是通过一个特定的容器启动的),而一个 TaskManager 多个 slot 意味着更多的subtask 可以共享同一个 JVM。

而在同一个 JVM 进程中的 task 将共享 TCP 连接(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task 的负载。

TaskManager和slot

taskmanager是一个进程。taskslot(对内存进行了封装,共享cpu)不是一个严格意义上的进程,可以理解为taskslot是一个线程池。线程池中每个task对应着Flink的一个算子。

总结

Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力,可以通过参数 taskmanager.numberOfTaskSlots进行配置,而并行度 parallelism 是动态概念,即 TaskManager 运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。

也就是说,假设一共有 3 个 TaskManager,每一个 TaskManager 中的分配 3 个TaskSlot,也就是每个 TaskManager 可以接收 3 个 task,一共 9 个 TaskSlot,如果我们设置 parallelism.default=1,即运行程序默认的并行度为 1,9 个 TaskSlot 只用了 1个,有 8 个空闲,因此,设置合适的并行度才能提高效率。

Tasks的slots和资源

整个进程是属于taskmanager。taskslot是一个线程池,虚线里是一个个线程。任务并行度是6。

补充:Flink实现JVM内存管理

问题2:上图中是两个任务吗?

DataFlow

Flink 程序由Source、Transformation、Sink 这三个核心组件组成,Source 主要负责数据的读取,Transformation 主要负责对属于的转换操作,Sink 负责最终数据的输出,在各个组件之间流转的数据称为流(streams)。

Flink程序模型

Flink 程序的基础构建模块是 流(streams) 与 转换(transformations)(需要注意的是,Flink 的 DataSet API 所使用的 DataSets 其内部也是 stream)。一个 stream可以看成一个中间结果,而一个 transformations 是以一个或多个 stream 作为输入的某种 operation,该 operation 利用这些 stream 进行计算从而产生一个或多个 result stream。

在运行时,Flink 上运行的程序会被映射成 streaming dataflows,它包含了streams 和 transformations operators。每一个 dataflow 以一个或多个 sources 开始以一个或多个 sinks 结束,dataflow 类似于任意的有向无环图(DAG)。

程序与数据流

并行数据流

Flink 程序的执行具有并行、分布式的特性。在执行过程中,一个 stream 包含一个或多个 stream partition ,而每一个 operator 包含一个或多个 operator subtask,这些 operator subtasks 在不同的线程、不同的物理机或不同的容器中彼此互不依赖得执行。

一个特定 operator 的 subtask 的个数被称之为其 parallelism(并行度)。一个stream 的并行度总是等同于其 producing operator 的并行度。一个程序中,不同的operator 可能具有不同的并行度。

并行数据流

Stream 在 operator 之间传输数据的形式可以是 one-to-one(forwarding)的模式也可以是 redistributing 的模式,具体是哪一种形式,取决于 operator 的种类。

One-to-one:stream(比如在 source 和 map operator 之间)维护着分区以及元素的顺序。那意味着 map operator 的 subtask 看到的元素的个数以及顺序跟 source operator 的 subtask 生产的元素的个数、顺序相同,map、fliter、flatMap 等算子都是one-to-one 的对应关系。

Redistributing:这种操作会改变数据的分区个数。每一个 operator subtask 依据所选择的 transformation 发送数据到不同的目标 subtask。例如,keyBy() 基于hashCode 重分区、broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute 过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。

Flink与operator chains

出于分布式执行的目的,Flink 将 operator 的 subtask 链接在一起形成 task,每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程 API 中进行指定。

下面这幅图,展示了 6 个 subtask 以6 个并行的线程来执行:

Tasks的slots和资源

关于operator chain参考链接:

https://www.jianshu.com/p/799744e347c7

https://blog.csdn.net/u013516966/article/details/104586156

https://blog.csdn.net/u013411339/article/details/89430407

任务调度流程

任务调度原理

客户端不是运行时和程序执行的一部分,但它用于准备并发送 dataflow 给Master,然后,客户端断开连接或者维持连接以等待接收计算结果,客户端可以以两种方式运行:要么作为 Java/Scala 程序的一部分被程序触发执行,要么以命令行./bin/flink run 的方式执行。

当系统本地启动时,一个JobManager和一个TaskManager会启动在同一个JVM中。

当一个程序被提交后,系统创建一个 Client来进行预处理,将程序转变成一个并行数据流形式,交给Jobmanagerf和 Taskmanager执行。

JobManager 负责协调 Flink系统,调度task,协调检查点,协调失败时恢复等。

TaskManager:执行并行程序的worker。

Flink运行模型

Flink程序模型

以上为 Flink 的运行模型,Flink 的程序主要由三部分构成,分别为 Source、 Transformation、Sink。DataSource 主要负责数据的读取,Transformation 主要负责对属于的转换操作,Sink 负责最终数据的输出。

Flink程序架构

每个Flink程序都包含以下的若干流程:

  • 获得一个执行环境;(Execution Environment)
  • 加载/创建初始数据;(Source)
  • 指定转换这些数据;(Transformation)
  • 指定放置计算结果的位置;(Sink)
  • 触发程序执行。

Environment

执行环境 StreamExecutionEnvironment 是所有 Flink 程序的基础。 创建执行环境有三种方式,分别为:

StreamExecutionEnvironment.getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

val env = StreamExecutionEnvironment.getExecutionEnvironment

StreamExecutionEnvironment.createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

StreamExecutionEnvironment.createRemoteEnvironment

返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。

val env = StreamExecutionEnvironment.createRemoteEnvironment(1)

Source

基于File 的数据源

1.readTextFile(path)

一列一列的读取遵循 TextInputFormat 规范的文本文件,并将结果作为 String 返回。

1
2
3
# flink scala-shell启动时就已经配置好本地的环境,处理批数据用benv即可 
val stream = benv.readTextFile("/opt/modules/test.txt")
stream.print() # shell终端不用benv.execute(),会自动执行输出

注意:stream.print():每一行前面的数字代表这一行是哪一个并行线程输出的。

2.readFile(fileInputFormat, path)

按照指定的文件格式读取文件。

1
2
3
4
5
6
7
scala> val path = new Path("/usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt")
path: org.apache.flink.core.fs.Path = /usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt

scala> val stream = benv.readFile(new TextInputFormat(path), "/usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt")
stream: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@46179420

scala> stream.print()

基于Socket 的数据源

1.socketTextStream

从 Socket 中读取信息,元素可以用分隔符分开。

开启端口监听

1
2
3
4
(base) [root@main wordcount]# nc -lp 10000
a
b
c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scala> val stream = senv.socketTextStream("localhost", 10000) 
stream: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@17ae13d5


scala> stream.print()
res1: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@2792c28

scala> senv.execute("job")
Submitting job with JobID: 45a46d09a24f6a81bf01bc8180608bc0. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:38277/user/jobmanager#1473519198] with leader session id 00000000-0000-0000-0000-000000000000.
04/27/2021 20:25:39 Job execution switched to status RUNNING.
04/27/2021 20:25:39 Source: Socket Stream(1/1) switched to SCHEDULED
04/27/2021 20:25:39 Sink: Unnamed(1/1) switched to SCHEDULED
04/27/2021 20:25:39 Source: Socket Stream(1/1) switched to DEPLOYING
04/27/2021 20:25:39 Sink: Unnamed(1/1) switched to DEPLOYING
04/27/2021 20:25:39 Sink: Unnamed(1/1) switched to RUNNING
04/27/2021 20:25:39 Source: Socket Stream(1/1) switched to RUNNING
a
b
c
# 程序等待端口数据的输入,直到端口断开监听

基于集合(Collection)的数据源

1.fromCollection(seq)

从集合中创建一个数据流,集合中所有元素的类型是一致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
scala> val list = List(1,2,3,4) 
list: List[Int] = List(1, 2, 3, 4)

scala> val stream = senv.fromCollection(list)
stream: org.apache.flink.streaming.api.scala.DataStream[Int] = org.apache.flink.streaming.api.scala.DataStream@4cdc345f

scala> stream.print()
res7: org.apache.flink.streaming.api.datastream.DataStreamSink[Int] = org.apache.flink.streaming.api.datastream.DataStreamSink@2360c312

scala> senv.execute("1")

Submitting job with JobID: 576229b78f71e242de8e5dbc29e65cd7. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:38277/user/jobmanager#1473519198] with leader session id 00000000-0000-0000-0000-000000000000.
04/27/2021 20:31:19 Job execution switched to status RUNNING.
04/27/2021 20:31:19 Source: Collection Source(1/1) switched to SCHEDULED
04/27/2021 20:31:19 Sink: Unnamed(1/1) switched to SCHEDULED
04/27/2021 20:31:19 Source: Collection Source(1/1) switched to DEPLOYING
04/27/2021 20:31:19 Sink: Unnamed(1/1) switched to DEPLOYING
04/27/2021 20:31:19 Source: Collection Source(1/1) switched to RUNNING
04/27/2021 20:31:19 Sink: Unnamed(1/1) switched to RUNNING
1
2
3
4
04/27/2021 20:31:19 Source: Collection Source(1/1) switched to FINISHED
04/27/2021 20:31:19 Sink: Unnamed(1/1) switched to FINISHED
04/27/2021 20:31:19 Job execution switched to status FINISHED.

2.fromCollection(Iterator)

从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由 iterator 返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
scala> val iterator = Iterator(1,2,3,4) 
iterator: Iterator[Int] = non-empty iterator

scala> val stream = senv.fromCollection(iterator)
stream: org.apache.flink.streaming.api.scala.DataStream[Int] = org.apache.flink.streaming.api.scala.DataStream@1d46d6d9

scala> stream.print()
res9: org.apache.flink.streaming.api.datastream.DataStreamSink[Int] = org.apache.flink.streaming.api.datastream.DataStreamSink@2afee2ee

scala> senv.execute("FirstJob")

Submitting job with JobID: 06f69ce8c0a8e545bfb57b2e40bbba83. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:38277/user/jobmanager#1473519198] with leader session id 00000000-0000-0000-0000-000000000000.
04/27/2021 20:32:26 Job execution switched to status RUNNING.
04/27/2021 20:32:26 Source: Collection Source(1/1) switched to SCHEDULED
04/27/2021 20:32:26 Sink: Unnamed(1/1) switched to SCHEDULED
04/27/2021 20:32:26 Source: Collection Source(1/1) switched to DEPLOYING
04/27/2021 20:32:26 Sink: Unnamed(1/1) switched to DEPLOYING
04/27/2021 20:32:26 Source: Collection Source(1/1) switched to RUNNING
04/27/2021 20:32:26 Sink: Unnamed(1/1) switched to RUNNING
1
2
3
4
04/27/2021 20:32:26 Source: Collection Source(1/1) switched to FINISHED
04/27/2021 20:32:26 Sink: Unnamed(1/1) switched to FINISHED
04/27/2021 20:32:26 Job execution switched to status FINISHED.

3.fromElements(elements:_*)
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scala> list
res25: List[Int] = List(1, 2, 3, 4)

scala> val stream = senv.fromElements(list)
stream: org.apache.flink.streaming.api.scala.DataStream[List[Int]] = org.apache.flink.streaming.api.scala.DataStream@5bb55f62

scala> stream.print()
res27: org.apache.flink.streaming.api.datastream.DataStreamSink[List[Int]] = org.apache.flink.streaming.api.datastream.DataStreamSink@223fa1eb

scala> senv.execute("1")
Submitting job with JobID: 233a984c2b46782a946e0a4ebfab503d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:38277/user/jobmanager#1473519198] with leader session id 00000000-0000-0000-0000-000000000000.
04/27/2021 20:43:39 Job execution switched to status RUNNING.
04/27/2021 20:43:39 Source: Collection Source(1/1) switched to SCHEDULED
04/27/2021 20:43:39 Sink: Unnamed(1/1) switched to SCHEDULED
04/27/2021 20:43:39 Source: Collection Source(1/1) switched to DEPLOYING
04/27/2021 20:43:39 Sink: Unnamed(1/1) switched to DEPLOYING
04/27/2021 20:43:39 Source: Collection Source(1/1) switched to RUNNING
04/27/2021 20:43:39 Sink: Unnamed(1/1) switched to RUNNING
List(1, 2, 3, 4)
04/27/2021 20:43:39 Source: Collection Source(1/1) switched to FINISHED
04/27/2021 20:43:39 Sink: Unnamed(1/1) switched to FINISHED
04/27/2021 20:43:39 Job execution switched to status FINISHED.
res28: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@62220004

4.generateSequence(from, to)
从给定的间隔中并行地产生一个数字序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
scala> val stream = senv.generateSequence(1,10) 
stream: org.apache.flink.streaming.api.scala.DataStream[Long] = org.apache.flink.streaming.api.scala.DataStream@2f776263

scala> stream.print()
res34: org.apache.flink.streaming.api.datastream.DataStreamSink[Long] = org.apache.flink.streaming.api.datastream.DataStreamSink@5bb4ef87

scala> senv.execute("1")
Submitting job with JobID: 30ef454533650d840267de56fa593177. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:38277/user/jobmanager#1473519198] with leader session id 00000000-0000-0000-0000-000000000000.
04/27/2021 20:45:58 Job execution switched to status RUNNING.
04/27/2021 20:45:58 Source: Sequence Source -> Sink: Unnamed(1/1) switched to SCHEDULED
04/27/2021 20:45:58 Source: Sequence Source -> Sink: Unnamed(1/1) switched to DEPLOYING
04/27/2021 20:45:58 Source: Sequence Source -> Sink: Unnamed(1/1) switched to RUNNING
1
2
3
4
5
6
7
8
9
10
04/27/2021 20:45:58 Source: Sequence Source -> Sink: Unnamed(1/1) switched to FINISHED
04/27/2021 20:45:58 Job execution switched to status FINISHED.
res35: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@3d84d877

Sink

Data Sink 消费 DataStream 中的数据,并将它们转发到文件、套接字、外部系统或者打印出。 Flink 有许多封装在 DataStream 操作里的内置输出格式。

1.writeAsText
将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的 toString()方法来获取。

2.WriteAsCsv
将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的 toString()方法。

3.print/printToErr
打印每个元素的 toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于 1,那么输出也会有一个标识由哪个任务产生的标志。

4.writeUsingOutputFormat
自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。
5.writeToSocket

根据 SerializationSchema 将元素写入到 socket 中。

Transformation

fold和spark中的哪个api相似?—updateBykey()

Map

DataStream → DataStream:输入一个参数产生一个参数。

1
val env = StreamExecutionEnvironment.getExecutionEnvironment scala> val stream = senv.generateSequence(1,10) stream: org.apache.flink.streaming.api.scala.DataStream[Long] = org.apache.flink.streaming.api.scala.DataStream@4a58e0e9scala> val streamMap = stream.map { x => x * 2 } streamMap: org.apache.flink.streaming.api.scala.DataStream[Long] = org.apache.flink.streaming.api.scala.DataStream@239d9cb7scala> streamMap.print()res1: org.apache.flink.streaming.api.datastream.DataStreamSink[Long] = org.apache.flink.streaming.api.datastream.DataStreamSink@34fcc5e3scala> senv.execute("1")Submitting job with JobID: c44fa84167932556d892e4d42ec0984c. Waiting for job completion.Connected to JobManager at Actor[akka.tcp://flink@localhost:37416/user/jobmanager#-642399026] with leader session id 00000000-0000-0000-0000-000000000000.04/28/2021 23:02:57	Job execution switched to status RUNNING.04/28/2021 23:02:57	Source: Sequence Source -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED 04/28/2021 23:02:57	Source: Sequence Source -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING 04/28/2021 23:02:57	Source: Sequence Source -> Map -> Sink: Unnamed(1/1) switched to RUNNING 246810121416182004/28/2021 23:02:57	Source: Sequence Source -> Map -> Sink: Unnamed(1/1) switched to FINISHED 04/28/2021 23:02:57	Job execution switched to status FINISHED.res2: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@7ddcd3d 

FlatMap

DataStream → DataStream:输入一个参数,产生 0 个、1 个或者多个输出。

1
scala> val stream = senv.readTextFile("/usr/local/src/practice_code/mr_practice/wordcount/The_Man_of_Property.txt")stream: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@6316b9e3scala> val streamFlatMap = stream.flatMap{     |      x => x.split(" ")      | }streamFlatMap: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@7335414escala> streamFlatMap.print()res5: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@4644b1c6heslammedthedoor.TheEnd04/28/2021 23:07:29	Split Reader: Custom File Source -> Flat Map -> Sink: Unnamed(1/1) switched to FINISHED 04/28/2021 23:07:29	Job execution switched to status FINISHED.res6: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@70b2de98

Filter

DataStream → DataStream:结算每个元素的布尔值,并返回布尔值为 true 的元素。下面这个例子是过滤出非 0 的元素:

1
scala> val stream = senv.generateSequence(1,10) stream: org.apache.flink.streaming.api.scala.DataStream[Long] = org.apache.flink.streaming.api.scala.DataStream@3b21f625scala> val streamFilter = stream.filter{      |      x => x == 1      | }streamFilter: org.apache.flink.streaming.api.scala.DataStream[Long] = org.apache.flink.streaming.api.scala.DataStream@37e44608scala> streamFilter.print() res7: org.apache.flink.streaming.api.datastream.DataStreamSink[Long] = org.apache.flink.streaming.api.datastream.DataStreamSink@4febb6e6scala> senv.execute("3")Submitting job with JobID: 3cdf23f740ccdd182eda99679b31ff8b. Waiting for job completion.Connected to JobManager at Actor[akka.tcp://flink@localhost:37416/user/jobmanager#-642399026] with leader session id 00000000-0000-0000-0000-000000000000.104/28/2021 23:08:25	Source: Sequence Source -> Filter -> Sink: Unnamed(1/1) switched to FINISHED 04/28/2021 23:08:25	Job execution switched to status FINISHED.res8: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@3296b4ba

Connect

Connect算子

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

test.txt:

1
hadoop yarn mapreducespark
1
scala> val stream = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt")stream: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@4a3d0660scala> val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))streamMap: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@5c447983scala> val streamCollect = senv.fromCollection(List(1,2,3,4)) streamCollect: org.apache.flink.streaming.api.scala.DataStream[Int] = org.apache.flink.streaming.api.scala.DataStream@79b417edscala> val streamConnect = streamMap.connect(streamCollect) streamConnect: org.apache.flink.streaming.api.scala.ConnectedStreams[String,Int] = org.apache.flink.streaming.api.scala.ConnectedStreams@375121cescala> streamConnect.map(item=>println(item), item=>println(item))res9: org.apache.flink.streaming.api.scala.DataStream[Unit] = org.apache.flink.streaming.api.scala.DataStream@6f51c04fscala> senv.execute("1")Submitting job with JobID: 8e1c8169e52f685a70a714272a2a6e8c. Waiting for job completion.Connected to JobManager at Actor[akka.tcp://flink@localhost:37416/user/jobmanager#-642399026] with leader session id 00000000-0000-0000-0000-000000000000.1234hadoop04/28/2021 23:13:02	Split Reader: Custom File Source -> Flat Map -> Filter(1/1) switched to FINISHED 04/28/2021 23:13:02	Co-Map(1/1) switched to FINISHED 04/28/2021 23:13:02	Job execution switched to status FINISHED.res11: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@2161a5e

CoMap,CoFlatMap

CoMap,CoFlatMap

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
scala> val stream1 = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt")
stream1: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@290807e5

scala> val streamFlatMap = stream1.flatMap(x => x.split(" "))
streamFlatMap: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@7b33deed

scala> val stream2 = senv.fromCollection(List(1,2,3,4))
stream2: org.apache.flink.streaming.api.scala.DataStream[Int] = org.apache.flink.streaming.api.scala.DataStream@63e40188

scala> val streamConnect = streamFlatMap.connect(stream2)
streamConnect: org.apache.flink.streaming.api.scala.ConnectedStreams[String,Int] = org.apache.flink.streaming.api.scala.ConnectedStreams@38affd02

scala> val streamCoMap = streamConnect.map(
| (str) => str + "connect",
| (in) => in + 100
| )
streamCoMap: org.apache.flink.streaming.api.scala.DataStream[Any] = org.apache.flink.streaming.api.scala.DataStream@49e54217

scala> streamCoMap.print()
res3: org.apache.flink.streaming.api.datastream.DataStreamSink[Any] = org.apache.flink.streaming.api.datastream.DataStreamSink@1f4c8062

scala> senv.execute("job")
101
102
103
104
hadoopconnect
yarnconnect
mapreduceconnect
sparkconnect
1
scala> val stream1 = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt") stream1: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@684ad81cscala> val stream2 = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt") stream2: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@7d776202scala> val streamConnect = stream1.connect(stream2) streamConnect: org.apache.flink.streaming.api.scala.ConnectedStreams[String,String] = org.apache.flink.streaming.api.scala.ConnectedStreams@a57b4bdscala> val streamCoMap = streamConnect.flatMap(      |      (str1) => str1.split(" "),      |      (str2) => str2.split(" ")      | )streamCoMap: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@3a435908scala> streamConnect.map(item=>println(item), item=>println(item)) res5: org.apache.flink.streaming.api.scala.DataStream[Unit] = org.apache.flink.streaming.api.scala.DataStream@20de2518scala> senv.execute("job")hadoopyarnmapreducespark04/28/2021 23:33:42	Split Reader: Custom File Source(1/1) switched to FINISHED hadoopyarnmapreducespark

Split

Split

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。

1
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("test.txt")  scala> val stream = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt") stream: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@6724f10escala> val streamFlatMap = stream.flatMap(x => x.split(" "))scala> val streamSplit = streamFlatMap.split(     |     num =>      |	   # 字符串内容为 hadoop的组成一个 DataStream,其余的组成一个 DataStream     |     (num.equals("hadoop")) match{     |         case true => List("hadoop")     |         case false => List("other")     |         })     | )scala> streamSplit.print()res0: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@32f243fscala> senv.execute("job")hadoopyarnmapreducehadoopspark

Select

Select

SplitStream→DataStream:从一个SplitStream 中获取一个或者多个 DataStream。

1
scala> val stream = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt") stream: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@613d42abscala> val streamFlatMap = stream.flatMap(x => x.split(" ")) streamFlatMap: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@2a64cb1fscala> val streamSplit = streamFlatMap.split(      |     num =>     |      (num.equals("hadoop")) match{      |          case true => List("hadoop")     |          case false => List("other")      |      }     | )streamSplit: org.apache.flink.streaming.api.scala.SplitStream[String] = org.apache.flink.streaming.api.scala.SplitStream@36b6ce5dscala> val hadoop = streamSplit.select("hadoop") hadoop: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@45fa7f1ascala> val other = streamSplit.select("other") other: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@2e300294scala> hadoop.print()res2: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@3106efb9scala> senv.execute("job")hadoophadoop

Union

Union

DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。注意:如果你将一个DataStream 跟它自己做 union 操作,在新的 DataStream 中,你将看到每一个元素都出现两次。

1
scala> val stream1 = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt") stream1: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@474f8a2bscala> val stream2 = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt") stream2: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@699d0fbascala> val streamFlatMap1 = stream1.flatMap(x => x.split(" ")) streamFlatMap1: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@399737f5scala> val streamFlatMap2 = stream2.flatMap(x => x.split(" ")) streamFlatMap2: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@5e42c338scala> val streamConnect = streamFlatMap1.union(streamFlatMap2) streamConnect: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@41466b78scala> streamConnect.print()res6: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@7dff5404scala> senv.execute("job")hadoopyarnmapreducehadoopsparkhadoopyarnmapreducehadoopspark

KeyBy

DataStream → KeyedStream:输入必须是 Tuple 类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

1
scala> val stream = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt") stream: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@a3b417scala> val streamFlatMap = stream.flatMap{      |      x => x.split(" ")      | }streamFlatMap: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@7030c643scala> val streamMap = streamFlatMap.map{      |      x => (x,1) }streamMap: org.apache.flink.streaming.api.scala.DataStream[(String, Int)] = org.apache.flink.streaming.api.scala.DataStream@4aa11055scala> val streamKeyBy = streamMap.keyBy(0) streamKeyBy: org.apache.flink.streaming.api.scala.KeyedStream[(String, Int),org.apache.flink.api.java.tuple.Tuple] = org.apache.flink.streaming.api.scala.KeyedStream@4e851f3cscala> streamKeyBy.print()res8: org.apache.flink.streaming.api.datastream.DataStreamSink[(String, Int)] = org.apache.flink.streaming.api.datastream.DataStreamSink@7a9fee3fscala> senv.execute()(hadoop,1)(yarn,1)(mapreduce,1)(hadoop,1)(spark,1)

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

1
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0) val streamReduce = stream.reduce(       (item1, item2) => (item1._1, item1._2 + item2._2) )streamReduce.print() env.execute("Job")# output(hadoop,1)(yarn,1)(mapreduce,1)(hadoop,2)(spark,1)

Fold

KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

1
hadoop spark mapreducehadoop spark yarn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val env = StreamExecutionEnvironment.getExecutionEnvironment 

val stream = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
val streamReduce = stream.fold(100)(
(begin, item) => (begin + item._2)
)
streamReduce.print()
env.execute("FirstJob")
# output
101
101
101
102
101

Aggregations

KeyedStream → DataStream:分组数据流上的滚动聚合操作。min 和 minBy 的区别是 min 返回的是一个最小值,而 minBy 返回的是其字段中包含最小值的元素(同样原理适用于 max 和 maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

1
2
3
4
5
6
7
8
9
10
11
keyedStream.sum(0) 
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")

keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
1
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = senv.readTextFile("/usr/local/src/practice_code/flink/test.txt").flaMap(item => item.split(" ")).map(x => (x, 1)).keyBy(0) val streamReduce = stream.sum(1) streamReduce.print() senv.execute("FirstJob") // 输出>(hadoop,1)(yarn,1)(mapreduce,1)(hadoop,2)(spark,1)

在 2.3.10 之前的算子都是可以直接作用在 Stream 上的,因为他们不是聚合类型的操作,但是到 2.3.10 后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实, reduce、fold、aggregation 这些聚合算子都是和 Window 配合使用的,只有配合Window,才能得到想要的结果。

Flink容错机制

容错简述2

流式计算分为有状态和无状态两种情况,所谓状态就是计算过程中的中间值

对于无状态计算,会独立观察每个独立事件,并根据最后一个事件输出结果。什么意思?大白话举例:对于一个流式系统,接受到一系列的数字,当数字大于N则输出,这时候在此之前的数字的值、和等情况,压根不关心,只和最后这个大于N的数字相关,这就是无状态计算。

什么是有状态计算?想求过去一分钟内所有数字的和或者平均数等,这种需要保存中间结果的情况是有状态的计算。

当分布式计系统中引入状态计算时,就无可避免一致性的问题。为什么了?因为若是计算过程中出现故障,中间数据咋办了?若是不保存,那就只能重新从头计算了,不然怎么保证计算结果的正确性。这就是要求系统具有容错性了。

实时计算容错机制简介

Storm:在 record level级别处理数据,数据延迟低,吞吐有限。

Spark Streaming:将源头数据流分成了微批,吞吐大,但数据延迟增加了。

流计算技术:必须对job状态进行管理,确保能从任何情況引起的 Job failurer中恢复,而且确保 exactly once可靠性,这样就会带来性能的开销,增加数据延迟和吞吐量的降低。

Flink:核心分布式数据流状态快照(即分布式快照,是轻量的)

  • 当job由于网络、集群或任何原因失败时,可以快速从这些分布式快照中恢复。

一致性

谈到容错性,就没法避免一致性这个概念。所谓一致性就是:成功处理故障并恢复之后得到的结果与没有发生任何故障是得到的结果相比,前者的正确性。换句大白话,就是故障的发生是否影响得到的结果。在流处理过程,一致性分为3个级别[1]:

  • at-most-once:至多一次。故障发生之后,计算结果可能丢失,就是无法保证结果的正确性;
  • at-least-once:至少一次。计算结果可能大于正确值,但绝不会小于正确值,就是计算程序发生故障后可能多算,但是绝不可能少算;
  • exactly-once:精确一次。系统保证发生故障后得到的计算结果的值和正确值一致;

Flink的容错机制保证了exactly-once,也可以选择at-least-once。Flink的容错机制是通过对数据流不停的做快照(snapshot)实现的。针对FLink的容错机制需要注意的是:要完全保证exactly-once,Flink的数据源系统需要有“重放”功能,具体将会在下面进行介绍。

CheckPoint3

Flink做快照的过程是基于“轻量级异步快照”的算法,其核心思想就是在计算过程中保存中间状态和在数据流中对应的位置,至于如何实现的会后续的博客中会详细说明。这些保存的信息(快照)就相当于是系统的检查点(checkpoint)(类似于window系统发生死机等问题时恢复系统到某个时间点的恢复点),做snapshot也是做一个checkpoint。在系统故障恢复时,系统会从最新的一个checkpoint开始重新计算,对应的数据源也会在对应的位置“重放“。这里的“重放”可能会导致数据的二次输出,这点的处理也在后续的博客中说明。

屏障 Barriesrs

在Flink做分布式快照过程中核心的是Barriers元素的使用。

如果用河水举例的话,Storm就是一滴滴的对数据进行处理,SparkStreaming就是一批一批的放水,上批水放完了,才放下批水。而Flink是在水中定期插入barrier,水仍然在流动,只是增加了些barrier。如果源头是多个数据源,那么都同步的增加相同的barrier,同时在job处理的过程中,为了保证job失败的时候可以从错误中恢复,Flink还对barrier进行对齐(align)操作。

这些Barriers是在数据接入到Flink之初就注入到数据流中,并随着数据流向每个算子(operator),这里所说的算子不是指类似map()等具体意义上个的,指在JobGraph中优化后的“顶点”),这里需要说明的有两点:

  • 算子对Barriers是免疫的,即Barriers是不参与计算的;
  • Barriers和数据的相对位置是保持不变的,而且Barriers之间是线性递增的;

如下图所示,Barriers将将数据流分成了一个个数据集。值得提醒的是,当barriers流经算子时,会触发与checkpoint相关的行为,保存的barriers的位置和状态(中间计算结果)。

Update:checkpoint是由JobManager中的CheckpointCoordinator周期性触发,然后在Task侧生成barrier,具体为:在Source task(TaskManager中)中barrier会根据命令周期性的在原始数据中注入barrier,而对非source task则是遇到barrier做checkpoint,即非source task其做checkpoint的时间间隔也许不是周期的,影响因素较多。此外,每个算子做checkpoint的方式也许不同。

可以打个比方,在河上有个大坝(相当于算子),接上级通知(Flink中的JobManager)要统计水流量等信息,所以有人在上游定期(source task)放一根木头(barrier)到河中,当第一木头遇到大坝时,大坝就记下通过大坝木头的位置、水流量等相关情况,即做checkpoint(实际生活中不太可能),当第二木头遇到大坝时记下第一个木头和第二根木头之间的水流量等情况,不需要重开始计算。这里先不管故障了,不然就不好解释相同的水的“重放”问题了。  

当一个算子有多个数据源时,又如何做checkpoint?

如下图,从左往右一共4副图。对于Operator接受多个数据流的情况,需要对数据流做排列对齐。

当算子收到其中一个数据源的barriers,而未收到另一个数据源的barriers时(如左1图),会将先到barriers的数据源中的数据先缓冲起来,暂停处理,等待另一个barriers(如左2图),当收到两个barriers(如左3图)即接收到全部数据源的barrier时,会做checkpoint,保存barriers位置和状态,发射缓冲中的数据,释放一个对应的barriers。这里需要注意是,当缓存中数据没有被发射完时,是不会处理后续数据的,这样是为了保证数据的有序性。

这里其实有一点需要注意的是,因为系统设置checkpoint的方式是通过时间间隔的形式(enableCheckpointing(intervalTime)),所以会导致一个问题:当一个checkpoint所需时间远大于两次checkpoint之间的时间间隔时,就很有可能会导致后续的checkpoint会失败,若是这样情况比较严重时会导致任务失败,这样Flink系统的容错性的优势就等不到保证了,所以需要合理设计checkpoint间隔时间。

状态(State

如下图所示,在一次snapshot中,算子会在接受到其数据源的所有barriers的以后snapshot它们的状态,然后在发射barriers到输出流中,直到最后所有的sink算子都完成snapshot才算完成一次snapshot。

其中,在准备发射的barriers形成之前,state 形式是可以改变的,之后就不可以了。state的存贮方式是可以配置的,如HDFS,默认是在JobManager的内存中。

这里要注意,Snapshot并不仅仅是对流做了一个状态CheckPoint,它也包含了一个Operator内部持有的状态,保证流处理系统失败时能够正确地恢复数据流处理,状态包括两种:

  • 系统状态:Operator进行计算需要有缓存,所以缓冲区的状态是与Operator相关联的。eg:窗口操作,系统会手机和聚合记录数据并放到缓冲区中,直到该缓冲区中的数据被处理完成。、
  • 自定义状态(状态可以通过转换函数进行创建和修改),它可以是函数中的java对象,或者与函数相关的key/value状态.map(x=>hashMap.getOrElse(x,o))

异步快照(asynchronous state snapshot

上述描述中,需要等待算子接收到所有barriers后,开始做snapshot,存储对应的状态后,再进行下一次snapshot,其状态的存储是同步的,这样可能会造成因snapshot引起较大延时。可以让算子在存储快照时继续处理数据,让快照存储异步在后台运行。为此,算子必须能生成一个 state 对象,保证后续状态的修改不会改变这个 state 对象。例如 RocksDB 中使用的 copy-on-write(写时复制)类型的数据结构,即异步状态快照。对异步状态快照,其可以让算子接受到barriers后开始在后台异步拷贝其状态,而不必等待所有的barriers的到来。一旦后台的拷贝完成,将会通知JobManager。只有当所有的sink接收到这个barriers,和所有的有状态的算子都确认完成状态的备份时,一次snapshot才算完成。如何实现的,这点后续博客将仔细分析。

状态存储

image-20210330082156838

image-20210411203106975

数据打入kafka,然后flink消费kafka的数据,处理后直接提供给线上存储。

Time与Window

Time

在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示:

Flink时间概念

Event Time(事件时间):是事件发生或者被创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。

Ingestion Time(采集时间):是事件进入 Flink 的时间。

Processing Time(处理时间):是每一个执行基于时间操作的算子(operator)的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。

按时间顺序,通常事件时间早于采集时间,采集时间早于处理时间。

例如,一条日志进入 Flink 的时间为 2017-11-12 10:00:00.123,到达 Window 的系统时间为 2017-11-12 10:00:01.234,日志的内容如下:

2017-11-02 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计 1min 内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。

Window

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

Window 可以分成两类:

  • CountWindow:按照指定的数据条数生成一个 Window,与时间无关。(事件窗口?)
  • TimeWindow:按照时间生成 Window。

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(TumblingWindow)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

TimeWindow

滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创建如下图所示:

滚动窗口

适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。

滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度滑动间隔组成。

特点:时间对齐,窗口长度固定,有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示:

滑动窗口

适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去。

时间窗口

Windows API

CountWindow (事件窗口)

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。

滚动窗口

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

1
输入>abaaababbccb
1
// 获取执行环境val senv = StreamExecutionEnvironment.getExecutionEnvironment // 创建 SocketSourceval stream = senv.socketTextStream("localhost", 11111) // 对 stream进行处理并按key聚合val streamKeyBy = stream.flatMap(item => item.split(" ")).map(x => (x, 1)).keyBy(0)// 引入滚动窗口// 这里的 5指的是5个相同 key的元素计算一次val streamWindow = streamKeyBy.countWindow(5) // 执行聚合操作val streamReduce = streamWindow.reduce(  (item1, item2) => (item1._1, item1._2 + item2._2)) // 将聚合数据写入文件streamReduce.print() // 执行程序senv.execute("TumblingWindow") >输出(a,5)(b,5)
滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。

下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。

(每当有两个key一样,计算window内5个key,接着再往下计算,发现有和之前计算key一样的且能够构成对的再次计算)

1
2
3
4
输入>
a a b c d e f g h i j e
f
h i j k h h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对 stream进行处理并按key聚合
val streamKeyBy = stream.flatMap(item => item.split(" ")).map(x => (x, 1)).keyBy(0)

// 引入滚动窗口
// 当相同 key的元素个数达到2个时,触发窗口计算,计算的窗口范围为 5
val streamWindow = streamKeyBy.countWindow(5,2)
// 执行聚合操作
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
env.execute("TumblingWindow")

> 输出
(a,2)
(e,2)
(f,2)
(h,2)
(i,2)
(j,2)
(h,4)

TimeWindow

TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算。

滚动窗口

Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中。

1
2
3
4
5
6
7
8
9
10
连续时间输入>
abc
abc
abc
abc
abc
abc
abc
abc
abc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建 SocketSource
val stream = senv.socketTextStream("localhost", 11111)

// 对 stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

// 引入时间窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行聚合操作
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)

// 将聚合数据写入文件
streamReduce.print()

// 执行程序
env.execute("TumblingWindow")
输出>
(abc,2) # 时间间隔
(abc,6)
(abc,1)

时间间隔可以通过 Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一个来指定。

滑动窗口(SlidingEventTimeWindows)

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2s,也就是说,窗口每 2s 就计算一次,每一次计算的 window 范围是 5s 内的所有元素。

1
输入>aaaaa
1
// 获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建 SocketSourceval stream = senv.socketTextStream("localhost", 11111) // 对 stream进行处理并按key聚合val streamKeyBy = stream.map(item => (item, 1)).keyBy(0) // 引入滚动窗口val streamWindow = streamKeyBy.timeWindow(Time.seconds(5), Time.seconds(2)) // 执行聚合操作val streamReduce = streamWindow.reduce(   (item1, item2) => (item1._1, item1._2 + item2._2) ) // 将聚合数据写入文件streamReduce.print() // 执行程序senv.execute("TumblingWindow") 输出>(a,2)(a,5)(a,5)(a,2)

时间间隔可以通过 Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一个来指定。

Window Reduce

WindowedStream → DataStream:给 window 赋一个 reduce 功能的函数,并返回一个聚合的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建 SocketSource
val stream = senv.socketTextStream("localhost", 11111)

// 对 stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

// 引入时间窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行聚合操作
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)

// 将聚合数据写入文件
streamReduce.print()

// 执行程序
senv.execute("TumblingWindow")

Window Fold

WindowedStream → DataStream:给窗口赋一个 fold 功能的函数,并返回一个 fold 后的结果。

1
// 获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建 SocketSourceval stream = senv.socketTextStream("localhost", 11111,'\n',3) // 对 stream进行处理并按key聚合val streamKeyBy = stream.map(item => (item, 1)).keyBy(0) // 引入滚动窗口val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行 fold操作val streamFold = streamWindow.fold(100){   (begin, item) => begin + item._2 } // 将聚合数据写入文件streamFold.print() // 执行程序senv.execute("TumblingWindow") 

Aggregation on Window

WindowedStream → DataStream:对一个 window 内的所有元素做聚合操作。min 和 minBy 的区别是 min 返回的是最小值,而 minBy 返回的是包含最小值字段的元素(同样的原理适用于 max 和 maxBy)。

1
// 获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment // 创建 SocketSourceval stream = senv.socketTextStream("localhost", 11111) // 对 stream进行处理并按key聚合val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)// 引入滚动窗口val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行聚合操作val streamMax = streamSum.max(1) // 将聚合数据写入文件streamMax.print() // 执行程序senv.execute("TumblingWindow") 

EventTime 与Window

EventTime 的引入

在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。

如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示:

1
val env = StreamExecutionEnvironment.getExecutionEnvironment // 从调用时刻开始给 env创建的每一个 stream追加时间特征env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

Watermark

基本概念

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络等原因,导致乱序的产生,所谓乱序,就是指 Flink接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的。

数据的乱序

那么此时出现一个问题,一旦出现乱序,如果只根据 eventTime 决定 window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark。
Watermark 是一种衡量 Event Time 进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的 Watermark。
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark 机制结合 window 来实现。
数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,window 的执行也是由 Watermark 触发的。

Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。

有序流的 Watermarker 如下图所示:(Watermark 设置为 0)

有序数据的watermark

乱序流的 Watermarker 如下图所示:(Watermark 设置为 2)

无序数据的watermark

当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

上图中,我们设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。

Watermark 的引入

1
2
3
4
5
6
7
8
9
10
11
12
val env = StreamExecutionEnvironment.getExecutionEnvironment 

// 从调用时刻开始给 env创建的每一个 stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream = env.socketTextStream("localhost", 11111).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) {
override def extractTimestamp(t: String): Long = {
// EventTime是日志生成时间,我们从日志中解析EventTime
t.split(" ")(0).toLong
}
})

EvnetTimeWindow API

当使用 EventTimeWindow 时,所有的 Window 在 EventTime 的时间轴上进行划分,也就是说,在 Window 启动后,会根据初始的 EventTime 时间每隔一段时间划分一个窗口,如果 Window 大小是 3 秒,那么 1 分钟内会把 Window 划分为如下的形式:

1
[00:00:00,00:00:03) [00:00:03,00:00:06) ... [00:00:57,00:01:00) 

如果 Window 大小是 10 秒,则 Window 会被分为如下的形式:

1
[00:00:00,00:00:10) [00:00:10,00:00:20) ... [00:00:50,00:01:00) 

注意,窗口是左闭右开的,形式为:[window_start_time,window_end_time)。 Window 的设定无关数据本身,而是系统定义好了的,也就是说,Window 会一直按照指定的时间间隔进行划分,不论这个 Window 中有没有数据,EventTime 在这个 Window 期间的数据会进入这个 Window。

Window 会不断产生,属于这个 Window 范围的数据会被不断加入到 Window 中,所有未被触发的 Window 都会等待触发,只要 Window 还没触发,属于这个 Window范围的数据就会一直被加入到 Window 中,直到 Window 被触发才会停止数据的追加,而当 Window 触发之后才接受到的属于被触发 Window 的数据会被丢弃。

Window 会在以下的条件满足时被触发执行:

  • watermark 时间 >= window_end_time;

  • 在[window_start_time,window_end_time)中有数据存在。

    我们通过下图来说明 Watermark、EventTime 和 Window 的关系。

滚动窗口(TumblingEventTimeWindows)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对 stream进行处理并按key聚合
val streamKeyBy = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {
override def extractTimestamp(element: String): Long = {
val sysTime = element.split(" ")(0).toLong
println(sysTime)
sysTime
}}).map(item => (item.split(" ")(1), 1)).keyBy(0)

// 引入滚动窗口
val streamWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(10)))

// 执行聚合操作
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)

// 将聚合数据写入文件
streamReduce.print
// 执行程序
env.execute("TumblingWindow")

结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。

滑动窗口(SlidingEventTimeWindows)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对 stream进行处理并按key聚合
val streamKeyBy = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {
override def extractTimestamp(element: String): Long = {
val sysTime = element.split(" ")(0).toLong
println(sysTime)
sysTime
}}).map(item => (item.split(" ")(1), 1)).keyBy(0)

// 引入滑动窗口
val streamWindow = streamKeyBy.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

// 执行聚合操作
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)

// 将聚合数据写入文件
streamReduce.print

// 执行程序
env.execute("TumblingWindow")

会话窗口(EventTimeSessionWindows)

相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark,那么当触发执行时,所有满足时间间隔而还没有触发的 Window 会同时触发执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 创建 SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对 stream进行处理并按key聚合
val streamKeyBy = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {
override def extractTimestamp(element: String): Long = {
val sysTime = element.split(" ")(0).toLong
println(sysTime)
sysTime
}}).map(item => (item.split(" ")(1), 1)).keyBy(0)

// 引入滚动窗口
val streamWindow = streamKeyBy.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
// 执行聚合操作
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print
// 执行程序
env.execute("TumblingWindow")

总结

Flink 是一个真正意义上的流计算引擎,在满足低延迟和低容错开销的基础之上,完美的解决了exactly-once 的目标,真是由于Flink 具有诸多优点,越来越多的企业开始使用Flink作为流处理框架,逐步替换掉了原本的 Storm 和 Spark 技术框架。