欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark tungsten-sort shuffle

程序员文章站 2022-07-15 18:28:50
...

spark tungsten-sort shuffle

  • 基于内存和CPU的瓶颈,Spark引入tungsten来改善性能。其中在shuffle中,引入了tungsten-sort shuflle。
  • tungsten-sort shuffle是基于java的Unsafe包实现的,相关的三个ShuffleWriter如下:
writer desc
ByPassMergeSortShuffleWriter 和hashShuffle实现基本相同,区别在与map task会输出汇总为一个文件。
SortShuffleWriter Sort Shuffle,在Map端基于partititonId排序,将所有的输出结果写入到一个文件中
UnsafeShuffleWriter Tungsten-sort,对序列化的指针进行排序,而不用进行序列化和反序列化的过程,减轻GC开销
  • SortShuffle在Spark Shuffle中介绍过,这里介绍下UnsafeShuffleWriter。
  • 顾名思义,Writer的主要作用是进行Map端的写操作,看一下他的Writer方法。
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
    // Keep track of success so we know if we encountered an exception
    // We do this rather than a standard try/catch/re-throw to handle
    // generic throwables.
    boolean success = false;
    try {
      while (records.hasNext()) {
        insertRecordIntoSorter(records.next());
      }
      closeAndWriteOutput();
      success = true;
    } finally {
      if (sorter != null) {
        try {
          sorter.cleanupResources();
        } catch (Exception e) {
          // Only throw this error if we won't be masking another
          // error.
          if (success) {
            throw e;
          } else {
            logger.error("In addition to a failure during writing, we failed during " +
                         "cleanup.", e);
          }
        }
      }
    }
  }
  • 写记录的方法是insertRecordIntoSorter(records.next());继续追踪

    void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    final K key = record._1();
    final int partitionId = partitioner.getPartition(key);
    serBuffer.reset();
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    serOutputStream.flush();
    
    
    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);
    
    
    sorter.insertRecord(
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
    }
    • 注意此时这个sort的类型是@Nullable private ShuffleExternalSorter sorter;
    • 然后进入到ShuffleExternalSort

      public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
      throws IOException {
      
      
      // for tests
      assert(inMemSorter != null);
      if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
        logger.info("Spilling data because number of spilledRecords crossed the threshold " +
          numElementsForSpillThreshold);
        spill();
      }
      
      
      growPointerArrayIfNecessary();
      // Need 4 bytes to store the record length.
      final int required = length + 4;
      acquireNewPageIfNecessary(required);
      
      
      assert(currentPage != null);
      final Object base = currentPage.getBaseObject();
      final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
      Platform.putInt(base, pageCursor, length);
      pageCursor += 4;
      Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
      pageCursor += length;
      inMemSorter.insertRecord(recordAddress, partitionId);
      }
    • 通过copyMemory进行内存复制,将record写入到page中。

    • 此时,还有一个数据结构需要更新,@Nullable private ShuffleInMemorySorter inMemSorter;,能仅仅通过指针进行比较的原理就在于这个数据结构中,这里记录了每条记录的partitionID和recordAddress。
    • 通过查看inMemSorter的compare的函数发现,确实是通过比较partitionID来确定record的大小的,因此实现了record相同的会连续排在一起。

      public int compare(PackedRecordPointer left, PackedRecordPointer right) {
        int leftId = left.getPartitionId();
        int rightId = right.getPartitionId();
        return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);
      }
      }
    • shuffleExternalSort和shuffleinMemory的图例如下

    • spark tungsten-sort shuffle

参考

相关标签: spark

上一篇: 汉诺塔问题

下一篇: 汉诺塔问题