Spark优化方式 & Spark优化顺序
Spark优化方式 & Spark优化顺序
Spark优化顺序
应先使用Spark相关参数进行调优,当参数优化效果未达预期时,再考虑对代码进行优化。对代码优化可能会造成运行结果产生偏差,导致费力不讨好,所以最好先采用参数调优,如果不能达到预期效果再考虑代码调优。
Spark优化方式
1.资源调优参数
num-executors:该作业总共需要多少executor进程执行;
- 建议:每个作业运行一般设置50~100个左右较合适
executor-memory:设置每个executor进程的内存, num-executors* executor-memory代表作业申请的总内存量(尽量不要超过最大总内存的1/3~1/2);
- 建议:设置4G~8G较合适
- executor-cores:每个executor进程的CPU Core数量,该参数决定每个executor进程并行执行task线程的能力, num-executors* executor-cores代表作业申请总CPU core数(不要超过总CPU Core的1/3~1/2 );
- 建议:设置2~4个较合适
- driver-memory
- 通常不用设置,一般1G够用,若出现collect算子将RDD数据全部拉取到Driver上,就必须保证该值足够大,否则会造成OOM内存溢出
- spark.default.parallelism:每个stage的默认task数量;
- 建议:设置500~1000较合适,默认一个HDFS的block对应一个task,Spark默认值偏少, 这样导致不能充分利用资源
- spark.storage.memoryFraction:设置RDD持久化数据在executor内存中能占的比例,默认0.6,即默认executor 60%的内存可以保存持久化RDD数据;
- 建议:若有较多的持久化操作,可以设置高一些,超出内存的频繁gc导致运行缓慢
- spark.shuffle.memoryFraction:聚合操作占executor内存的比例,默认0.2
- 建议:若持久化操作较少,但shuffle较多时,可以降低持久化内存占比,提高shuffle操作内存占比
以上参数可以在提交spark任务时设置,也可以在客户端设置。
2.避免使用重复的RDD
对同一份数据,只创建一个RDD,不要创建多个RDD来代表同一份数据,避免浪费内存。
bad case
1 | #1 |
1 | #2 |
Optimize case
1 | #1 |
1 | #2 |
3.尽可能复用同一个RDD
除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。
4.对多次使用的RDD进行持久化处理
每次对一个RDD执行一个算子操作时,都会重新从源头处理计算一遍,计算出那个RDD出来,然后进一步操作,这种方式性能很差。
对多次使用的RDD进行持久化,将RDD的数据保存在内存或磁盘中,避免重复劳动,借助cache()和persist()方法。
5.Shuffle调优
5.1避免使用Shuffle类算子
在spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle涉及到写磁盘、数据合并、网络数据传输,因此,能避免使用shuffle,就不使用。
将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合和join处理,比如groupByKey、reduceByKey、join等算子,都会触发shuffle。而使用Broadcast+map的join操作不会触发shuffle,适合于RDD数据量较少时使用。
cad case
1 | val rdd3 = rdd1.join(rdd2) |
Opitimize case
1 | val rdd2Data = rdd2.collect() |
5.2使用map-site预聚合Shuffle操作
如果一定要使用shuffle的,无法用map类算子替代的,那么尽量使用map-site预聚合的算子。map-site的计算思想类似MapReduce中的Combiner。
可能的情况下使用reduceByKey或aggregateByKey算子替代groupByKey算子,因为reduceByKey或aggregateByKey算子会使用用户自定义的函数对每个节点本地相同的key进行预聚合,而groupByKey算子不会预聚合。
5.3先去重,再合并
A.union(B).distinct()
vs.A.distinct().union(B.distinct()).distinct()
5.4参数调优
spark.shuffle.file.buffe
- 设置shuffle write task的buffer大小,将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘
spark.reducer.maxSizeInFligh
- 设置shuffle read task的buffer大小,决定了每次能够拉取pull多少数据。减少拉取数据的次数,也就减少了网络传输的次数
spark.shuffle.sort.bypassMergeThreshold
- shuffle read task的数量小于这个阈值(默认是200),则map-side/shuffle write过程中不会进行排序操作
6.filter + coalesce减少 Task数量
如果使用filter后,数据量级减少了2/3左右,此时可使用coalesce,来缩减partition分区数,从而减少Task数,提高运行效率。
7.用foreachPartition替代foreach
使用foreach会对每个partition中的重复的操作都做一次,而foreachPartition能够提取出重复的操作,对所有partition仅做一次,提高了效率。
8.使用Kryo优化序列化性能
Kryo是一个序列化类库,来优化序列化和反序列化性能,Spark默认使用Java序列化机制(ObjectOutputStream/ ObjectInputStream API)进行序列化和反序列化。Spark支持使用Kryo序列化库,性能比Java序列化库高很多,10倍左右。
9.对数据结构进行优化
对于一些数据类型,如果对于数据说过大,可以换占用内存较小的数据类型。
使用原始(int, long)替代字符串,使用数组替代集合类型,减少内存占用,从而降低GC频率,提升性能。
10.调整Spark任务的并发度
使用textFile()、parallelize()等方法的第二个参数来设置并行度。
使用spark.defaultparallelism
来设置统一的并行度。
并行度决定了并发的能力。
11.对小文件多分区优化
小文件导致task很多,导致每个core处理的批次变多,从而性能下降。
- 使用 textfile,参数写明 Partition个数,然后使用 repartition方法重分区
12.关于查看RDD数据操作的优化
减少RDD的直接 collect、pint操作使用take、first代替。