欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark-Shuffle

程序员文章站 2022-07-15 18:59:25
...

Shuffle概述

  在MapReduce和Spark中都有Shuffle。对于MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,将Map阶段读取的数据进行Shuffle操作并输出到对应的Reduce,Reduce进行计算。MapReduce的Shuffle过程如图所示。
Spark-Shuffle
Shuffle实质上是对数据进行重组,在整个Shuffle过程中,往往伴随着大量的磁盘和网络I/O,因此Shuffle性能的高低也直接决定了整个程序的性能高低。Spark的Shuffle实现过程如图所示。
Spark-Shuffle
  Spark在DAG调度的过程中,会根据Shuffle过程来划分Stage阶段,即存在ShuffleDependency(宽依赖)的时候,需要进行Shuffle,会将作业job划分成多个Stage。划分Stage时,构建ShuffleDependency的时候会进行Shuffle注册,获取后续数据读取所需要的ShuffleHandle,最终每一个job提交后都会生成一个ResultStage和若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage。ResultStage与ShuffleMapStage中的task分别对应着ResultTask与ShuffleMapTask。一个job,除了最终的ResultStage外,其他若干ShuffleMapStage中各个ShuffleMapTask都需要将最终的数据根据相应的Partitioner对数据进行分组,然后持久化分区的数据。进行数据持久化目的在于1)容错;2)降低内存数据存储压力。

两种Shuffle机制

HashShuffle机制

HashShuffle概述

  在1.6版本之前,Spark使用HashShuffle,1.6版本之后使用Sort-Base Shuffle。Spark的运行主要分为两部分:一部分是以SparkContext为核心的驱动程序,另一部分是Worker节点上Task,它是运行实际任务的。程序运行的时候,Driver和Executor进程相互交互:运行什么任务,即Driver会通过网络传输为Executor分配Task;任务数据从哪儿获取,即Task要从 Driver 抓取其他上游Task的数据结果,所以这个过程中就会不断地产生网络结果。其中,下游Stage 向上游Stage申请数据的过程就称之为 Shuffle。

未优化的HashShuffle机制

  在没有优化HashShuffle之前,每一个ShufflleMapTask会为每一个ReduceTask创建一个bucket缓存,并且会为每一个bucket创建一个文件。这个bucket存放的数据就是经过Partitioner操作(默认为HashPartitioner)之后找到对应的bucket然后放进去,最后将刷新bucket缓存中的数据到磁盘上,即对应的block file。然后ShuffleMapTask将输出作为MapStatus发送到DAGScheduler的MapOutputTrackerMaster,每一个MapStatus包含了每一个ResultTask要拉取的数据的位置和大小。ResultTask然后利用BlockStoreShuffleFetcher向MapOutputTrackerMaster获取MapStatus,看哪一份数据是属于自己的,然后底层通过BlockManager将数据拉取过来,拉取过来的数据会组成一个内部的ShuffleRDD,优先放入内存,内存不够用则放入磁盘,然后ResultTask开始进行聚合,最后生成我们希望获取的那个MapPartitionRDD。
Spark-Shuffle
缺点:
如上图所示:有1个Worker,2个Executor,每一个Executor运行2个ShuffleMapTask,有3个ReduceTask,所以总共就有4 * 3=12个bucket和12个block file。如果数据量较大,将会生成M* R个小文件,比如ShuffleMapTask有100个,ResultTask有100个,这就会产生100*100=10000个小文件。bucket缓存很重要,需要将ShuffleMapTask所有数据都写入bucket,才会刷到磁盘,那么如果Map端数据过多,这就很容易造成内存溢出,尽管后面有优化,bucket写入的数据达到刷新到磁盘的阀值之后,就会将数据一点一点的刷新到磁盘,但是这样磁盘I/O就多了。

优化的HashShuffle机制

  每一个Executor进程根据核数,决定Task的并发数量,如:Executor核数为2,则可以并发运行两个Task。假设Executor核数为1,ShuffleMapTask数量是M,那么它依然会根据ResultTask的数量R,创建R个bucket缓存,然后对key进行hash操作,数据进入不同的bucket中,每一个bucket对应着一个block file,用于刷新bucket缓存里的数据。下一个Task运行的时候,不用再创建新的bucket和block file,而是复用之前的Task已经创建好的bucket和block file,即所谓同一个Executor进程里所有Task都会把相同的key放入相同的bucket缓冲区中。这样的话,生成文件的数量就是(本地Worker的Executor数量* Executor的cores * ResultTask数量)如上图所示,即2 * 1* 3 = 6个文件,每一个Executor的ShuffleMapTask数量100,ReduceTask数量为100,那么未优化的HashShuffle的文件数是2 * 1* 100 * 100 =20000,优化之后的数量是2* 1* 100 = 200文件,相当于少了100倍。
Spark-Shuffle
缺点:如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生很多小文件。
  HashShuffle写数据的时候,内存有一个bucket缓冲区,同时在本地磁盘有对应的本地文件,如果本地有文件,那么在内存存储对应的文件句柄也是需要消耗内存的。即从内存的角度考虑,有一部分存储数据,一部分管理文件句柄。如果Mapper分片数量为1000,Reduce分片数量为1000,那么总共就需要1000000个小文件。所以就会有很多内存消耗,频繁I/O以及GC频繁或者出现内存溢出。而且Reducer端读取Map端数据时,Mapper有这么多小文件,就需要打开很多网络通道读取,很容易造成Reducer(下一个stage)通过Driver去拉取上一个stage数据的时候,出现文件找不到的情况,其实不是文件找不到而是由于正在GC导致程序不响应。

Sort-Based Shuffle机制

Sort-Based Shuffle概述

  为了缓解HashShuffle过程产生文件数过多和Writer缓存开销过大的问题,Spark引入了类似于MapReduce的Shuffle机制。该机制每一个ShuffleMapTask不会为后续的任务创建单独的文件,而是会将所有的Task结果写入同一个文件,并且对应生成一个索引文件。以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer缓存所占用的内存大小,而且同时避免GC的风险和频率。
Spark-Shuffle
Sort-Based Shuffle有几种不同的策略:
1)BypassMergeSortShuffleWriter
2)SortShuffleWriter
3)UnasfeSortShuffleWriter
  对于BypassMergeSortShuffleWriter,使用这个模式特点:
【1】主要用于处理不需要排序和聚合的Shuffle操作,所以数据是直接写入文件,数据量较大的时候,网络I/O和内存负担较重。
【2】主要适合处理Reducer任务数量比较少的情况下。
【3】将每一个分区写入一个单独的文件,最后将这些文件合并,减少文件数量;但是这种方式需要并发打开多个文件,对内存消耗比较大。
  因为BypassMergeSortShuffleWriter这种方式比SortShuffleWriter更快,所以如果在Reducer数量不大,又不需要在map端聚合和排序,而且Reducer的数目 < spark.shuffle.sort.bypassMergeThrshold指定的阀值,就是用的是这种方式。
  对于SortShuffleWriter,使用这个模式特点:
【1】比较适合数据量很大的场景或者集群规模很大.
【2】引入了外部外部排序器,可以支持在Map端进行本地聚合或者不聚合。
【3】如果外部排序器enable了spill功能,如果内存不够,可以先将输出溢写到本地磁盘,最后将内存结果和本地磁盘的溢写文件进行合并。
  另外这个Sort-Based Shuffle跟Executor核数没有关系,即跟并发度没有关系,它为每一个ShuffleMapTask都会产生一个data文件和index文件,所谓合并也只是将该ShuffleMapTask的各个partition对应的分区文件合并到data文件而已。所以这个就需要跟Hash-BasedShuffle的consolidation机制区别开来。

SortShuffleManager运行原理

  SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当ShuffleReadTask的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通运行机制

  普通的SortShuffleManager的原理如图所示。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的Shuffle算子选用不同的数据结构。如果是reduceByKey这种聚合类的Shuffle算子,会选用Map数据结构,一边通过Map进行聚合,一边写入内存。如果是join这种普通的Shuffle算子,会选用Array数据结构,直接写入内存。当每写一条数据进入内存数据结构之后,就会判断是否达到了临界阈值,如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
Spark-Shuffle
  在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,即排序好的数据,会以每批10000条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
  一个Task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个Task就只对应一个磁盘文件,也就意味着该Task为下游stage的Task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个Task的数据在文件中的start offset与end offset。
  SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个Task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个Task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

bypass运行机制

下图说明了bypass SortShuffleManager的原理。
Spark-Shufflebypass运行机制的触发条件如下:
1)ShuffleMap Task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认为200)。
2)不是聚合类的Shuffle算子(比如reduceByKey)。
此时Task会为每个下游Task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
  该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,ShuffleRead的性能会更好。
  而该机制与普通SortShuffleManager运行机制的不同在于:
1)磁盘写机制不同
2)不会进行排序
也就是说,启用该机制的最大好处在于,ShuffleWrite过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

Shuffle数据倾斜问题解决

  在进行Shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个Task来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。
  通过Spark Web UI来查看当前运行的stage各个Task分配的数据量,从而进一步确定是不是Task分配的数据不均匀导致了数据倾斜。知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个Shuffle类算子。通过countByKey查看各个key的分布。
数据倾斜解决方案:
1)过滤少数导致倾斜的key
2)提高shuffle操作的并行度
3)局部聚合和全局聚合
实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。代码:

def solution():RDD[(String,Int)]={
	val SPLIT="-"
	val prefix=new Random().nextInt(10)
	pairs.map(t2=>(prefix+SPLIT+t2._1,1)),reduceByKey((v1,v2)=>v1+v2).map(t2=>(t2._1.split(SPLIT)(1),t2._2)).reduceByKey((v1,v2)=>v1+v2)
}

4)将reduce join转为map join((小表几百M或者一两G))   
实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;  
  接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。 代码:

def solution():RDD[(String,Int)]={
	val broadcast=sc.broadcast(sc.textFile(vidStatsPath).map(s=>{
		val SPLIT=","
		val ss=s.split(SPLIT)
		(ss(1),ss(2))
	}).collectAsMap())
	//inner join
	vid_pairs.filter(t2=>{
		broadcast.value.keySet.contains(t2._1)
	}).map(t2=>{
		broadcast.value.get(t2._1).get
	})
}

5)采样倾斜key并分拆join操作(join的两表都很大,但仅一个RDD的几个key的数据量过大)
实现思路
对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。而另外两个普通的RDD就照常join即可。最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。代码:

def solution():RDD[(String,Int)]={
	val skewedKey=pairs1.sample(false,0.1).map(t2=>(t2._1,1)).reduceByKey((v1,v2)=>v1+v2).map(t2=>t2.swap).sortByKey(false).first._2
	val skewedRDD=pairs1.filter(_._1.equals(skewedKey))
	val commonRDD=pairs1.filter(!_._1.equals(skewedKey))
	val SPLIT="_"
	val size=100
	val skewedRDD2=pairs2.filter(_._1.equals(skewedKey)).flatMap(...)
	val joinedRDD1=skewedRDD.map(t2=>{
		val prefix=new Random().nextInt(size)
		(prefix+SPLIT+t2._1,t2._2)	
	}).join(skewedRDD2).map(t2=>{
		val key=t2._1.split(SPLIT)(1)
		(key,t2._2)
	})
	val joinedRDD2=commonRDD.join(pairs2)
	joinedRDD1.union(joinedRDD2)
}

使用随机前缀和扩容RDD进行join(RDD中有大量的key导致数据倾斜)
实现思路:将含有较多倾斜key的RDD扩大多倍,与相对分布均匀的RDD配一个随机数。

def solution():RDD[(String,Int)]={
	val SPLIT="_"
	val size=100
	val skewedRDD=pairs1.flatMap(t2=>{
		val array=new Array[Tiple2[String,String]](size)
		for(i<-0 to size){
			array(i)=(new Tuple2[String,String](i+SPLIT+t2._1,t2._2.toString))
		}
		array
	})
	pairs2.map(t2=>{
		val prefix=new Random().nextInt(size)
		(prefix+SPLIT+t2._1,t2._2)	
	}).join(skewedRDD).map(t2=>{
		val key=t2._1.split(SPLIT)(1)
		(key,t2._2)
	})
}

Spark RDD中的Shuffle算子

//去重
def distinct()
def distinct(numPartitions: Int)

//聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

//排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

//重分区
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)

//集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

Spark Shuffle参数调优

spark.shuffle.file.buffer
默认值:32K
参数说明:用于设置ShuffleWriteTask的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优思路:如果job可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘I/O次数,进而提升性能。
spark.reducer.maxSizeInFlight
默认值:48M
参数说明:用于设置ShuffleReadTask的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优思路:如果job可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96M),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
spark.shuffle.io.maxRetries
默认值:3
参数说明:ShuffleReadTask从ShuffleWriteTask所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优思路:对于那些包含了特别耗时的Shuffle操作的作业,适当增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。特别针对超大数据量(数十亿~上百亿)的Shuffle过程,调节该参数可以大幅度提升稳定性。
spark.shuffle.io.retryWait
默认值:5s
参数说明:代表了每次重试拉取数据的等待间隔,默认是5s。
调优思路:适当加大间隔时长(比如60s),以增加shuffle操作的稳定性。
spark.shuffle.memoryFraction
默认值:0.2
参数说明:代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优思路:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给ShuffleRead的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。
spark.shuffle.manager
默认值:sort
参数说明:用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优思路:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果ShuffleReadTask的数量小于这个阈值(默认是200),则ShuffleWrite过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优思路:当你使用SortShuffleManager时,如果不需要排序操作,那么适当将这个参数调大一些,大于ShuffleReadTask的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此ShuffleWrite性能有待提高。
spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并ShuffleWrite的输出文件,对于ShuffleReadTask数量特别多的情况下,这种方法可以极大地减少磁盘I/O开销,提升性能。
调优建议:如果不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。

相关标签: Spark Shuffle

推荐阅读