欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark_使用累加器实现WordCount避免shuffle

程序员文章站 2022-06-14 13:46:12
...

package com.atguigu.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object TestACCWc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster(“local[7]”).setAppName(“MyWordCount”)
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List(“hello”,“saprk”,“hello”))
//自定义累加器
//1.创建累加器对象
val wcAcc = new MyAccumulator
//2.向spark进行注册
sc.register(wcAcc,“wordCountAcc”)
//3.使用累加器
rdd.foreach(
word=>{
wcAcc.add(word)
}
)
println(wcAcc.value)
}
//继承AccumulatorV2定义泛型
//IN:累加器输入类型
//OUT:累加器返回的数据类型
class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{
private var wcMap=mutable.MapString,Long
override def isZero: Boolean = wcMap.isEmpty

override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new MyAccumulator()

override def reset(): Unit = {wcMap.clear()}

override def add(word: String): Unit = {
  val newCnt: Long = wcMap.getOrElse(word,0L)+1
  wcMap.update(word,newCnt)
}

override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
    val map1: mutable.Map[String, Long] = this.wcMap
    val map2: mutable.Map[String, Long] = other.value
  //两个map合并
    map2.foreach{
      case(word,count)=>{
        val newCount=map1.getOrElse(word,0L)+count
        map1.update(word,newCount)
      }

    }

}

override def value: mutable.Map[String, Long] = {
  wcMap
}

}
}

相关标签: spark