spark tungsten-sort shuffle

  • 基于内存和CPU的瓶颈,Spark引入tungsten来改善性能。其中在shuffle中,引入了tungsten-sort shuflle。
  • tungsten-sort shuffle是基于java的Unsafe包实现的,相关的三个ShuffleWriter如下:
writer desc
ByPassMergeSortShuffleWriter 和hashShuffle实现基本相同,区别在与map task会输出汇总为一个文件。
SortShuffleWriter Sort Shuffle,在Map端基于partititonId排序,将所有的输出结果写入到一个文件中
UnsafeShuffleWriter Tungsten-sort,对序列化的指针进行排序,而不用进行序列化和反序列化的过程,减轻GC开销
  • SortShuffle在Spark Shuffle中介绍过,这里介绍下UnsafeShuffleWriter。
  • 顾名思义,Writer的主要作用是进行Map端的写操作,看一下他的Writer方法。
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
    // Keep track of success so we know if we encountered an exception
    // We do this rather than a standard try/catch/re-throw to handle
    // generic throwables.
    boolean success = false;
    try {
      while (records.hasNext()) {
      success = true;
    } finally {
      if (sorter != null) {
        try {
        } catch (Exception e) {
          // Only throw this error if we won't be masking another
          // error.
          if (success) {
            throw e;
          } else {
            logger.error("In addition to a failure during writing, we failed during " +
                         "cleanup.", e);
  • 写记录的方法是insertRecordIntoSorter(records.next());继续追踪

    void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    final K key = record._1();
    final int partitionId = partitioner.getPartition(key);
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
    • 注意此时这个sort的类型是@Nullable private ShuffleExternalSorter sorter;
    • 然后进入到ShuffleExternalSort

      public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
      throws IOException {
      // for tests
      assert(inMemSorter != null);
      if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
        logger.info("Spilling data because number of spilledRecords crossed the threshold " +
      // Need 4 bytes to store the record length.
      final int required = length + 4;
      assert(currentPage != null);
      final Object base = currentPage.getBaseObject();
      final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
      Platform.putInt(base, pageCursor, length);
      pageCursor += 4;
      Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
      pageCursor += length;
      inMemSorter.insertRecord(recordAddress, partitionId);
    • 通过copyMemory进行内存复制,将record写入到page中。

    • 此时,还有一个数据结构需要更新,@Nullable private ShuffleInMemorySorter inMemSorter;,能仅仅通过指针进行比较的原理就在于这个数据结构中,这里记录了每条记录的partitionID和recordAddress。
    • 通过查看inMemSorter的compare的函数发现,确实是通过比较partitionID来确定record的大小的,因此实现了record相同的会连续排在一起。

      public int compare(PackedRecordPointer left, PackedRecordPointer right) {
        int leftId = left.getPartitionId();
        int rightId = right.getPartitionId();
        return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);
    • shuffleExternalSort和shuffleinMemory的图例如下

    • spark tungsten-sort shuffle


