spark tungsten-sort shuffle
程序员文章站
2022-07-15 18:28:50
...
spark tungsten-sort shuffle
- 基于内存和CPU的瓶颈,Spark引入tungsten来改善性能。其中在shuffle中,引入了tungsten-sort shuflle。
- tungsten-sort shuffle是基于java的Unsafe包实现的,相关的三个ShuffleWriter如下:
writer | desc |
---|---|
ByPassMergeSortShuffleWriter | 和hashShuffle实现基本相同,区别在与map task会输出汇总为一个文件。 |
SortShuffleWriter | Sort Shuffle,在Map端基于partititonId排序,将所有的输出结果写入到一个文件中 |
UnsafeShuffleWriter | Tungsten-sort,对序列化的指针进行排序,而不用进行序列化和反序列化的过程,减轻GC开销 |
- SortShuffle在Spark Shuffle中介绍过,这里介绍下UnsafeShuffleWriter。
- 顾名思义,Writer的主要作用是进行Map端的写操作,看一下他的Writer方法。
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
-
写记录的方法是insertRecordIntoSorter(records.next());继续追踪
void insertRecordIntoSorter(Product2<K, V> record) throws IOException { assert(sorter != null); final K key = record._1(); final int partitionId = partitioner.getPartition(key); serBuffer.reset(); serOutputStream.writeKey(key, OBJECT_CLASS_TAG); serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG); serOutputStream.flush(); final int serializedRecordSize = serBuffer.size(); assert (serializedRecordSize > 0); sorter.insertRecord( serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId); }
- 注意此时这个sort的类型是
@Nullable private ShuffleExternalSorter sorter;
-
然后进入到ShuffleExternalSort
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException { // for tests assert(inMemSorter != null); if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } growPointerArrayIfNecessary(); // Need 4 bytes to store the record length. final int required = length + 4; acquireNewPageIfNecessary(required); assert(currentPage != null); final Object base = currentPage.getBaseObject(); final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); Platform.putInt(base, pageCursor, length); pageCursor += 4; Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); pageCursor += length; inMemSorter.insertRecord(recordAddress, partitionId); }
通过copyMemory进行内存复制,将record写入到page中。
- 此时,还有一个数据结构需要更新,
@Nullable private ShuffleInMemorySorter inMemSorter;
,能仅仅通过指针进行比较的原理就在于这个数据结构中,这里记录了每条记录的partitionID和recordAddress。 -
通过查看inMemSorter的compare的函数发现,确实是通过比较partitionID来确定record的大小的,因此实现了record相同的会连续排在一起。
public int compare(PackedRecordPointer left, PackedRecordPointer right) { int leftId = left.getPartitionId(); int rightId = right.getPartitionId(); return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0); } }
shuffleExternalSort和shuffleinMemory的图例如下
- 注意此时这个sort的类型是
参考
推荐阅读
-
PyCharm搭建Spark开发环境实现第一个pyspark程序
-
Apache Spark开发介绍
-
Python使用random.shuffle()打乱列表顺序的方法
-
Spark SQL join的三种实现方式
-
浅谈tensorflow中dataset.shuffle和dataset.batch dataset.repeat注意点
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(一)
-
Spark2.1.0——运行环境准备
-
Apache 流框架 Flink,Spark Streaming,Storm对比分析(二)
-
Spark新手入门——1.Scala环境准备
-
Spring Boot与Spark、Cassandra系统集成开发示例