Spark优化方式 & Spark优化顺序

Spark优化顺序

应先使用Spark相关参数进行调优,当参数优化效果未达预期时,再考虑对代码进行优化。对代码优化可能会造成运行结果产生偏差,导致费力不讨好,所以最好先采用参数调优,如果不能达到预期效果再考虑代码调优。

Spark优化方式

1.资源调优参数

  1. num-executors:该作业总共需要多少executor进程执行;

    • 建议:每个作业运行一般设置50~100个左右较合适
  2. executor-memory:设置每个executor进程的内存, num-executors* executor-memory代表作业申请的总内存量(尽量不要超过最大总内存的1/3~1/2);

    • 建议:设置4G~8G较合适
  3. executor-cores:每个executor进程的CPU Core数量,该参数决定每个executor进程并行执行task线程的能力, num-executors* executor-cores代表作业申请总CPU core数(不要超过总CPU Core的1/3~1/2 );
    • 建议:设置2~4个较合适
  4. driver-memory
    • 通常不用设置,一般1G够用,若出现collect算子将RDD数据全部拉取到Driver上,就必须保证该值足够大,否则会造成OOM内存溢出
  5. spark.default.parallelism:每个stage的默认task数量;
    • 建议:设置500~1000较合适,默认一个HDFS的block对应一个task,Spark默认值偏少, 这样导致不能充分利用资源
  6. spark.storage.memoryFraction:设置RDD持久化数据在executor内存中能占的比例,默认0.6,即默认executor 60%的内存可以保存持久化RDD数据;
    • 建议:若有较多的持久化操作,可以设置高一些,超出内存的频繁gc导致运行缓慢
  7. spark.shuffle.memoryFraction:聚合操作占executor内存的比例,默认0.2
    • 建议:若持久化操作较少,但shuffle较多时,可以降低持久化内存占比,提高shuffle操作内存占比

以上参数可以在提交spark任务时设置,也可以在客户端设置。

2.避免使用重复的RDD

对同一份数据,只创建一个RDD,不要创建多个RDD来代表同一份数据,避免浪费内存。

bad case

1
2
3
4
#1
val rdd1 = sc.textFile("HDFSDIR1")
val rdd2 = rdd1.map(...)
rdd2.filter(...)
1
2
3
4
5
#2
val rdd1 = sc.textFile("HDFSDIR1")
rdd1.map(...)
val rdd2 = sc.textFile("HDFSDIR1")
rdd2.filter(...)

Optimize case

1
2
3
#1
val rdd1 = sc.textFile("HDFSDIR1")
val rdd2 = rdd1.map(...).filter(...)
1
2
3
4
#2
val rdd1 = sc.textFile("HDFSDIR1")
rdd1.map(...)
rdd2.filter(...)

3.尽可能复用同一个RDD

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

4.对多次使用的RDD进行持久化处理

每次对一个RDD执行一个算子操作时,都会重新从源头处理计算一遍,计算出那个RDD出来,然后进一步操作,这种方式性能很差。

对多次使用的RDD进行持久化,将RDD的数据保存在内存或磁盘中,避免重复劳动,借助cache()和persist()方法。

5.Shuffle调优

5.1避免使用Shuffle类算子

在spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle涉及到写磁盘、数据合并、网络数据传输,因此,能避免使用shuffle,就不使用。

将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合和join处理,比如groupByKey、reduceByKey、join等算子,都会触发shuffle。而使用Broadcast+map的join操作不会触发shuffle,适合于RDD数据量较少时使用。

cad case
1
val rdd3 = rdd1.join(rdd2)
Opitimize case
1
2
3
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast)

5.2使用map-site预聚合Shuffle操作

如果一定要使用shuffle的,无法用map类算子替代的,那么尽量使用map-site预聚合的算子。map-site的计算思想类似MapReduce中的Combiner。

可能的情况下使用reduceByKey或aggregateByKey算子替代groupByKey算子,因为reduceByKey或aggregateByKey算子会使用用户自定义的函数对每个节点本地相同的key进行预聚合,而groupByKey算子不会预聚合。

5.3先去重,再合并

  • A.union(B).distinct() vs. A.distinct().union(B.distinct()).distinct()

5.4参数调优

spark.shuffle.file.buffe

  • 设置shuffle write task的buffer大小,将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘

spark.reducer.maxSizeInFligh

  • 设置shuffle read task的buffer大小,决定了每次能够拉取pull多少数据。减少拉取数据的次数,也就减少了网络传输的次数

spark.shuffle.sort.bypassMergeThreshold

  • shuffle read task的数量小于这个阈值(默认是200),则map-side/shuffle write过程中不会进行排序操作

6.filter + coalesce减少 Task数量

如果使用filter后,数据量级减少了2/3左右,此时可使用coalesce,来缩减partition分区数,从而减少Task数,提高运行效率。

7.用foreachPartition替代foreach

使用foreach会对每个partition中的重复的操作都做一次,而foreachPartition能够提取出重复的操作,对所有partition仅做一次,提高了效率。

8.使用Kryo优化序列化性能

Kryo是一个序列化类库,来优化序列化和反序列化性能,Spark默认使用Java序列化机制(ObjectOutputStream/ ObjectInputStream API)进行序列化和反序列化。Spark支持使用Kryo序列化库,性能比Java序列化库高很多,10倍左右。

9.对数据结构进行优化

对于一些数据类型,如果对于数据说过大,可以换占用内存较小的数据类型。

使用原始(int, long)替代字符串,使用数组替代集合类型,减少内存占用,从而降低GC频率,提升性能。

10.调整Spark任务的并发度

使用textFile()、parallelize()等方法的第二个参数来设置并行度。

使用spark.defaultparallelism来设置统一的并行度。

并行度决定了并发的能力。

11.对小文件多分区优化

小文件导致task很多,导致每个core处理的批次变多,从而性能下降。

  • 使用 textfile,参数写明 Partition个数,然后使用 repartition方法重分区

12.关于查看RDD数据操作的优化

减少RDD的直接 collect、pint操作使用take、first代替。