Spark基础

Spark定义

Spark是一种基于内存的快速、通用、可扩展的大数据分析引擎。

Spark的历史

2009年诞生于加州大学伯克利分校AMPLab,项目采用Scala编写;

2010年开源;

2013年6月成为Apache孵化项目;

2014年2月成为Apache顶级项目;

Spark内置模块

Spark Core是Spark的核心,实现了 Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core 中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API 定义。通过对其核心进行拓展,Spark在上层构架了很多高级的应用。

Spark Sql是Spark 用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL 或者Apache Hive 版本的SQL 方言(HQL)来查询数据。Spark SQL 支持多种数据源,比如Hive 表、Parquet 以及JSON 等。 只需用简单的语法就能完成大数据的统计,对飙Hive。

Spark Streaming是Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与 Spark Core 中的 RDD API 高度对应。其实现了秒级别的实时计算,对飙storm(毫秒级别)等流式计算框架。

Spark Mlib和Spark Graphx内置很多机器学习的算法包。Spark Mlib 提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。 主要对飙Mahout。在没有spark之前,相关算法都是用Mahout跑,不过现在Mahout基本被淘汰了。

集群管理器,Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计 算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark 自带的一个简易调度 器,叫作独立调度器。

目前,腾讯 Spark 集群达到 8000 台的规模,是当前已知的世界上最大的 Spark 集群。

Spark特点

1.快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上,Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中的。

2.易用: Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题的方法。

3.通用:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用,减少了开发和维护的人员成本和部署平台的物力成本。

4.兼容性:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark强大的处理能力。

Spark与MR的对比

Spark与MapReduce对比

Spark解决了什么问题

1.内存计算,效率更高

spark是一种基于内存操作的迭代模型,在spark中表的表示是dataframe,hive中表的表示是table。

df经过多次处理后,处理成模型需要输入的放入数据:df_n

df_n.cache加载到内存 -> 直接获取内存中数据,提高代码整体执行效率。

image-20210302101541605

2.更丰富的API

spark提供了丰富的API,解决了api单一问题。

在RDD中,count是action操作,在DF中是transformation。action是写操作,将数据写入磁盘或内存。cache是transformation,df.cache使得该df之后的操作是要放到内存中的。

3.完整的作业描述

可以通过简单的一行代码,将用户的整个作业串联起来,可以立即解释,不像mr需要用多个map和reduce脚本来实现。

image-20210307080519787

Spark中的重要角色

Driver(驱动器)

Spark 的驱动器是执行开发程序中的 main 方法的进程。它负责开发人员编写的用来创建SparkContext、创建RDD,以及进行RDD 的转化操作和行动操作代码的执行。如果你是用spark shell,那么当你启动 Spark shell 的时候,系统后台自启了一个Spark 驱动器程序,就是在Spark shell 中预加载的一个SparkContext (sc)对象。如果驱动器程序终止,那么Spark 应用也就结束了。简单来看,可以将其理解为Yarn架构中的Application Master。

主要负责:

  • 把用户程序转为任务

  • 跟踪Executor 的运行状况

  • 为执行器节点调度任务

  • UI 展示应用运行状况

Executor(执行器)

Spark Executor可以将其看作是Yarn中container中的一个进程。它是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。 Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。主要负责:

  • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;

  • 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD 提供内存式存储。RDD 是直接缓存在Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

job

Spark job就是一个action 算子,每个job由若干个transformer算子和一个action算子组成。

RDD

弹性分布式内存集,是一个抽象的内存区域。

补充概念

每一个进程里面包含executor对象,内部有一个线程池,每一个线程执行一个task,每个job里有多个task

线程池的优点:省去线程频繁启动的开销。因为进程启动较慢会占用较多的资源。

task并行度的概念

一个节点表示一个机器,每个节点可以启动多个executor ,每个executor由若干个core(虚拟的)组成,每一个core执行一个task, 这里的core就是线程的粒度。每次只能执行一个线程,每个task执行结果对应RDD中的一个partition,RDD 中由多个partition组成,每一个partition由一个task进行处理。

Spark的运行模式

Local模式

Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下几种方式设置Master。

  • local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式;

  • local[K]: 指定使用几个线程来运行计算,比如local[4]就是运行4个Worker;

  • local[*]: 这种模式直接帮你按照Cpu最多Cores来设置线程数;

基本语法
1
2
3
4
5
6
7
bin/spark-submit \ 
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \ --conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
参数说明

--master:指定Master 的地址,默认为Local

--class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)

--deploy-mode: 是否发布你的驱动到 worker 节点(cluster) 或者作为一个本地客户端
(client) (default: client)*

--conf: 任意的Spark 配置属性, 格式 key=value. 如果值包含空格,可以加引号
“key=value”

application-jar: 打包好的应用 jar,包含依赖. 这个URL 在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path 都包含同样的 jar application-arguments: 传给 main()方法的参数

--executor-memory:1G 指定每个executor 可用内存为 1G

--total-executor-cores 2:指定每个executor 使用的 cup 核数为2 个

Spark通用运行流程示意图

Standalone模式

Spark的Standalone模式就是一个独立的Spark集群,是一个由Master+Worker(Slave) 构成的Spark 集群,Spark 运行在集群中。

Standalone HA架构

Yarn模式

Spark 客户端直接连接 Yarn,不需要额外构建 Spark 集群。有 yarn-client 和 yarn-cluster两种模式,主要区别在于Driver程序的运行节点的不同。

yarn-client:Driver 程序运行在客户端,适用于交互、调试,希望立即看到 app 的输出;

yarn-cluster:Driver 程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境;

Yarn模式运行流程图

Mesos模式

Spark 客户端直接连接 Mesos;不需要额外构建Spark 集群。国内应用比较少,更多的是运用yarn 调度。

三种模式的对比

Spark内存管理

堆内内存

是指任务启动之后,任务自身的内存。

Executor内存分为3部分:

  1. Storage(60%内存):
  • cache (帮助下游数据出现问题时进行快速恢复,代价是需要把上游数据提前缓存。如果没有把数据缓存,下游任务挂了,需要从头开始计算一遍任务
  • persist
  • broadcast

在60%内存中,不是全部用于存储,里面有10%不能存任何信息,得空着,为了防止越界,作为缓冲,可以通过图中的参数来修改这个内存大小;

  • unroll指在去读内存数据时,通常利用迭代器的方式进行循环遍历,这种时候需要留一定空间与程序交互,属于缓冲迭代器,这块也可以通过图中的参数设置。
  • 减去unroll的内存和缓冲内存,剩下的就是存储计算数据所需的内存。
  1. Execution(20%)
  • 执行内存 (执行join、shuffle类算子(join、aggregate等)
  1. Other(20%):留给程序自己的,通常内存可以设计的相对小一些

堆外内存

堆外内存也需要提前去设置需要多少内存,内存不属于程序自身,是属于程序之外的内存,当自身程序内存不够用,可以跨出进程空间去写内存。

堆外内存默认按各自50%分给Storage和Execution。

缺点是如果内存管理不善,会导致两者之间内存使用不太均匀(比如Storage内存不够用,而Executor内存有空余,但是不能给Storage利用)。

堆外内存动态分配机制

随着Spark版本更新,允许堆外内存进行动态分配。

堆外内存动态分配机制

Storage自身内存用满了,从堆外上排写内存,Execution也是,各自的堆外内存没写满,两者相安无事。

当Storage内存越界了,但是Execution内存写不满,中间有缓存区,发现两者之间没撞上,只要不妨碍Execution执行内存的占用,可以继续写。

一旦两者碰上了,如果Storage越界了,还占用了Execution的内存,淘汰掉占用的那部分内存。

当Execution执行内存写超过了Storage内存,不淘汰,执行内存优先级更高,继续写。

Spark任务配置

任务参数

但一旦涉及到broadcast时候,driver-memory要设置大一些。

Spark核心

RDD概述

Spark的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的,这使得 Spark 的各个组件可以无缝地进行集成,能够在同一个应用程序中完成大数据处理。

RDD( Resilient Distributed Dataset ):弹性分布式数据集(相当于集合),它的本质是数据集的描述(只读的、可分区的分布式数据集),而不是数据集本身。

RDD 是 Spark 提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。

每个RDD包含了数据分块/分区(partition)的集合,每个partition是不可分割的,RDD描述的内容有:

  • 实际数据块的描述(实际数据到底存在哪,或者不存在)

  • 其值依赖于哪些partition

简单来说,我们可以将RDD看作是一群数据的相关信息集合,RDD的数据默认是存放在内存中,但是内存资源会存在不足的情况,此时spark会将RDD数据写入磁盘(内存和磁盘切换)。

RDD容错

RDD具有良好的容错能力,如果任务执行失败,可以自动从失败节点恢复,由于某个节点宕机了导致数据丢失,RDD会根据自己的数据来源重新计算一遍,计算失败的partition的数据。

容错计算是基于“血缘关系”,即代表任务的具体来龙去脉,映射hive表,表是由哪些表进行生成的或者是最终作用哪些表,如果计算代价较大,可以提前设置cache,将中间数据读入缓存,减少重新计算的开销。

RDD 的分区(Partition)与工作结点(Worker Node)的分布关系

通俗点来讲,可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个RDD可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同节点上,从而可以在集群中的不同节点上进行并行计算。

RDD的partition与HDFS的block的对比

HDFS中的block是分布式存储的最小数据单元,一个文件可能占用多个block,但一个block的内容一定来自同一份文件。

Spark中的Partition是弹性分布式数据集RDD的最小单元,RDD由分布在各节点的partition组成。partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的,这也是为什么叫“弹性分布式”数据集的原因之一。

Spark RDD的几种分区方式

 1.HashPartition

HashPartitioner确定分区的方式:partition = key.hashCode () % numPartitions
弊端:弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据。

2.RangePartitioner

RangePartitioner会对key值进行排序,然后将key值被划分成分区份数key值集合。
特点:RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样 ——-水塘抽样(Reservoir Sampling)问题

3.CustomPartitioner

CustomPartitioner可以根据自己具体的应用需求,自定义分区。

1
2
3
4
5
6
7
8
9
10
11
12

class CustomPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int =
{
if(key==1)){
0
} else if (key==2){
1} else{
2 }
}
}

Api解释:

  • spark默认实现了HashPartitioner和RangePartitioner两种分区策略,我们也可以自己扩展分区策略,自定义分区器的时候继承org.apache.spark.Partitioner类,实现类中的三个方法
    def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
    def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
  • 使用,调用parttionBy方法中传入自定义分区对象

Spark读入HDFS数据默认分区方式

Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。

RDD 的执行过程

1.读入外部数据源进行创建,分区;

2.RDD经过一系列的转化操作,每一次都会产生不同的RDD供给下一个转化操作使用;

3.最后一个 RDD经过一个action(动作)操作进行计算并输出到外部数据源;

优点:惰性调用、管道化、不需要保存中间结果

RDD 关键特征

1.容错性:RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算;

2.中间结果保存到内存,避免了不必要的内存开销;

3.使用更丰富的操作来处理,只读(由一个RDD变换得到另一个RDD,但是不能对本身的RDD修改);

4.懒操作,延迟计算,action的时候才操作;

RDD创建方式

1.通过读取外部文件来生成

1
val a = sc.textFile("/xxx/yyy/file")

2.通过并行化的方式创建RDD

1
val array = Array(1, 2, 3, 4, 5)

3.其他方式

包括其他的RDD转换,或者读取数据库的方式等等。

RDD宽依赖和窄依赖

窄依赖: 指的是每一个父RDD的Partition最多被子RDD的一个Partition依赖。使用任务可以在本地执行,不需要shuffle;

常见算子:map、flatmap、filter、union、sample 等等;

宽依赖: 指的是多个子RDD的Partition会依赖同一个父RDD的Partition;除非父RDD是hash-partitioned, 否则都需要shuffle ;

常见算子:groupByKey、reduceByKey、sortByKey、join 等等;

DAG:表示整个Spark的执行流程,是代码逻辑的一个图示;

spark中划分stage通过宽依赖进行的,遇见宽依赖就切分,每个stage内部能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化

宽依赖和窄依赖的作用:
1、stage划分
2、容错(针对复杂业务逻辑,当执行到宽依赖的时候,进行适当的cache,担心任务异常结束,数据重跑)
3、代码优化

Spark开发调优

优化原则:首先使用参数优化,然后使用代码优化;代码优化可能导致出现最后结果不一致的情况。

参数调优

executor内存资源使用

内存资源分为三块:

  • 第一块:让task执行代码时,默认占executor总内存的20%;
  • 第二块:task通过shuffle过程拉取上一个stage的task的输出后,进行聚合等操作时默认也是占Executor总内存的20%,使用Task的执行速度和每个executor进程的CPU Core数量有直接关系,一个CPU Core同一时间只能执行一个线程,每个executor进程上分配到的多个task,都是以task一条线程的方式,多线程并发运行的。如果CPU Core数量比较充足,而且分配到的task数量比较合理,那么可以比较快速和高效地执行完这些task线程;
  • 第三块:让RDD持久化时使用,默认占executor总内存的60%;

资源参数调优

  • num-executors:该作业总共需要多少executor进程执行;

    • 建议:每个作业运行一般设置50~100个左右较合适
  • executor-memory:设置每个executor进程的内存, num-executors* executor-memory代表作业申请的总内存量(尽量不要超过最大总内存的1/3~1/2);

    • 建议:设置4G~8G较合适
  • exe cutor-cores:每个executor进程的CPU Core数量,该参数决定每个executor进程并行执行task线程的能力, num-executors* executor-cores代表作业申请总CPU core数(不要超过总CPU Core的1/3~1/2 );

    • 建议:设置2~4个较合适
  • driver-memory

    • 通常不用设置,一般1G够用,若出现collect算子将RDD数据全部拉取到Driver上,就必须保证该值足够大,否则会造成OOM内存溢出
  • spark.default.parallelism:每个stage的默认task数量;

    • 建议:设置500~1000较合适,默认一个HDFS的block对应一个task,Spark默认值偏少, 这样导致不能充分利用资源
  • spark.storage.memoryFraction:设置RDD持久化数据在executor内存中能占的比例,默认0.6,即默认executor 60%的内存可以保存持久化RDD数据;

    • 建议:若有较多的持久化操作,可以设置高一些,超出内存的频繁gc导致运行缓慢
  • spark.shuffle.memoryFraction:聚合操作占executor内存的比例,默认0.2

    • 建议:若持久化操作较少,但shuffle较多时,可以降低持久化内存占比,提高shuffle操作内存占比

spark-submit设置参数

补充:Spark配置优先级

优先级最高的是在用户代码中显示调用set()方法设置的选项。其次是通过spark-submit传递的参数,再次是写再配置文件中的值,最后是系统默认的值。

Spark开发调优原则

原则一:避免创建重复的RDD

对同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据,避免极大浪费内存。

原则二:尽可能复用同一个RDD

比如:一个RDD数据格式是key-value,另一个是单独value类型,这两个RDD的value部分全一样,这样可以复用达到减少算子执行次数。

原则三:对多次使用的RDD进行持久化处理

每次对一个RDD执行一个算子操作时,都会重新从源头处理计算一遍,计算出那个RDD出来,然后进一步操作,这种方式性能很差。

–对多次使用的RDD进行持久化,将RDD的数据保存在内存或磁盘中,避免重复劳动,借助cache()和persist()方法。

cache是persist的一种特殊形式。persist支持很多类型的缓冲,cache是其中一种(纯内存)。

cache本质调用persist ,看源码:

查看存储级别有哪些:

查看unpersist源码

原则四:避免使用shuffle类算子

在spark作业运行过程中,最消耗性能的地方就是shuffle过程。

将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合和join处理,比如groupByKey、reduceByKey、join等算子,都会触发shuffle。

重点:shuffle涉及到写磁盘、数据合并、网络数据传输,因此,能避免使用shuffle,就不使用。

像join算子属于shuffle算子,能不用就尽可能不用。用map-site join,通过broadcast。

比如,通过driver将数据分发到不同的Executor里面去。

broadcast如何将小表加入内存呢?

比如有个小表,将其放到driver的内存里,executor环节处理大表的记录时,可以通过broadcast将小表分发到对应内存离去,这时只需要遍历大表的key,去内存里面查找小表对应的value。

使用Broadcast时,会将数据收集到driver,数据量太大会导致driver端压力过大垮掉。

原则五:使用map-site预聚合的shuffle操作

一定要使用shuffle的,无法用map类算子替代的,那么尽量使用map-site预聚合的算子。

map-site思想类似MapReduce中的Combiner。

可能的情况下使用reduceByKey或aggregateByKey算子替代groupByKey算子,因为reduceByKey或aggregateByKey算子会使用用户自

定义的函数对每个节点本地相同的key进行预聚合,而groupByKey算子不会预聚合。

map-site join

是指,当有一张大表(M行)和小表(N行)做join时,shuffle会做M*N次。用map-site join将小表读入内存,此时只需要遍历一遍大表即可,即只需做M次。

在map-site join里面,broadcast体现了主要的功能。

mapPartition代替Map

当有一个大规模数据,如果走map,相当于从头到尾遍历一遍,每个记录都要走map。如果map里面涉及到了访问一个数据库,这个时候每一个数据都要链接一遍,创建数据库,产生过多的句柄。当map涉及到远程访问操作就不行了。

这时做一个优化,希望把数据库访问链接在外部创建,内部可复用,这样就避免了多次创建数据库客户端情况的发生。

mapPartition就相当于在Partition外面罩了一层。

filter+coalsece

如果数据量大,处理能力有限,怎么办?

如果有一个数据,统一对其进行处理。如果使得处理更快?想办法让数据的规模不断的变小变小,类似精排。即可以通过filter,把数据进行压缩,减少partition的个数。

foreachPartitions替代foreach

跟前边的map-site join原理类似。

原则六:使用Kryo优化序列化性能

Kryo是一个序列化类库,来优化序列化和反序列化性能,Spark默认使用Java序列化机制(ObjectOutputStream/ ObjectInputStream API)进行序列化和反序列化。Spark支持使用Kryo序列化库,性能比Java序列化库高很多,10倍左右。

其他调优方法

对于一些数据类型,如果对于数据说过大,可以换占用内存较小的数据类型。

partition个数决定了并发的能力。

参考链接

链接1:https://www.jianshu.com/p/c36d684ae79e