欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于spark开发wordcount案例

程序员文章站 2022-07-14 13:58:10
...

spark的WordCount

原理:
基于spark开发wordcount案例

数据流分析:
textFile(“in”):读取本地文件in文件夹数据;
flatMap(.split(" ")):压平操作,按照空格分割符将一行数据映射成一个个单词;
map((
,1)):对每一个元素操作,将单词映射为元组;
reduceByKey(+):按照key将值进行聚合,相加;
collect:将数据收集到Driver端展示。

package day0904
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

  object WordCount {
    def main(args: Array[String]): Unit = {
      //获取配置信息 setMaster设置本地模式 setAppName设置应用程序名
      val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")

      //获取SparkContext上下文对象
      val sc = new SparkContext(config)

      //从in(可以是目录,可以是文件)中读取文件中的每一行,跟Hadoop取一行
      //这块不指定分区数,针对文件来说默认至少两个分区
      val lines: RDD[String] = sc.textFile("in")

      //转换结构1: 每行的内容以空格作为分割符
      val words = lines.flatMap(_.split(" "))

      //转换结构2: (oop,1) (spark,1) (mnl,1).... 二元组,key是单词,value是1
      val wordToOne: RDD[(String, Int)] = words.map((_, 1))

      //转换结构3: 将上述map阶段处理完的数据做聚合操作,相同key做value求和
      val wordTosum = wordToOne.reduceByKey(_ + _)

      //收集阶段
      val result: Array[(String, Int)] = wordTosum.collect()

      //控制台打印输出
      result.foreach(println)

      //结果保存至文件中
      wordTosum.saveAsTextFile("outputword")
    }

}

运行结果:
基于spark开发wordcount案例

基于spark开发wordcount案例

部署到yarn集群上:
这里根据电脑选择合适的路径sc.textFile("/input")这是我集群的文件,不过我试过保存至hadoop文件系统乱码,所以 wordTosum.saveAsTextFile(“outputword”)代码我没有加入,打包jar。运行命令
spark-submit --class day0904.WordCount --master yarn --deploy-mode client Spark-1.0-SNAPSHOT-jar-with-dependencies.jar

–class 指定需要运行的Main方法所在类, --master 指定部署模式 yarn client: 这个是说Spark Driver和ApplicationMaster进程均在本机运行,而计算任务在cluster上。还有其他参数我这里并没有加入,跟jar包路径
效果如下:
基于spark开发wordcount案例
总结:scala在执行collect()函数(收集)之前不会对数据以及文件做任何操作,这是scala懒加载机制,也是RDD两类算子的区别.

相关标签: WordCount spark