欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

[Kafka与Spark集成系列三] Spark编程模型

程序员文章站 2022-07-15 18:02:31
...

欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。
[Kafka与Spark集成系列三] Spark编程模型

在Spark中,我们通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动地在集群上并行进行。这样的数据集被称为弹性分布式数据集(Resilient Distributed Dataset),简称RDD。RDD是Spark对分布式数据和计算的基本抽象。在Spark中,对数据的所有操作不外乎创建RDD、转换已有RDD以及调用RDD操作进行求值。在《Spark的安装及简单应用》的单词统计示例中,rdd和wordmap都是MapPartitionsRDD类型的RDD,而wordreduce是ShuffledRDD类型的RDD。

RDD支持2种类型的操作:转换操作(Transformation Operation)和行动操作(Action Operation)。有些资料还会细分为创建操作、转换操作、控制操作以及行动操作等4种类型。转换操作会由一个RDD生成一个新的RDD。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或者把结果存储到外部存储系统中。转换操作和行动操作的区别在于Spark计算RDD的方式不同。虽然你可以在任何时候定义新的RDD,但Spark只会惰性计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。表中给出了转换操作和行动操作之间对比的更多细节。

类别 函数 区别
转换操作 map、filter、groupBy、join、union、reduce、sort、partitionBy等 返回值还是RDD,不会立马提交给Spark集群运行
行动操作 count、collect、take、save、show等 返回值不是RDD,会形成DAG图,提交给Spark集群运行并立即返回结果

通过转换操作,从已有的RDD中派生出新的RDD,Spark会使用谱系图(lineage graph,很多资料也会翻译为“血统”)来记录这些不同RDD之间的依赖关系。Spark需要用这些信息来按需计算每个RDD,也可以依赖谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生产实际的输出,它们会强制执行那些求值必须用到的RDD的转换操作。

Spark中RDD计算是以分区(Partition)为单位的,将RDD划分为很多个分区分布到集群的节点中,分区的多少涉及对这个RDD进行并行计算的粒度。如下图所示,实线方框A、B、C、D、E、F、G都表示的是RDD,阴影背景的矩形则表示分区。A、B、C、D、E、F、G之间的依赖关系构成整个应用的谱系图。

[Kafka与Spark集成系列三] Spark编程模型
依赖关系还可以分为窄依赖和宽依赖。窄依赖(Narrow Dependencies)是指每个父RDD的分区都至多被一个子RDD的分区使用,而宽依赖(Wide Dependencies)是指多个子RDD的分区依赖一个父RDD的分区。图中,C和D之间是窄依赖,而A和B之间是宽依赖。RDD中行动操作的执行将会以宽依赖为分界来构建各个调度阶段,各个调度阶段内部的窄依赖则前后链接构成流水线。图中的3个虚线方框分别代表了3个不同的调度阶段。

对于执行失败的任务,只要它对应的调度阶段的父类信息仍然可用,该任务就会分散到其它节点重新执行。如果某些调度阶段不可用,则重新提交相应的任务,并以并行方式计算丢失的地方。在整个作业中如果某个任务执行缓慢,系统则会在其他节点上执行该任务的副本,并最终取最先得到的结果作为最终的结果。

下面就以与《Spark的安装及简单应用》中相同的单词统计程序来分析一下Spark的编程模型,与《Spark的安装及简单应用》中所不同的是,这里的是一个完整的Scala程序,程序所对应的Maven依赖如下:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

具体代码示例如下:

package scala.spark.demo
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit ={
    val conf = new SparkConf().setAppName("WordCount").setMaster("local")①
    val sc = new SparkContext(conf)②
    val rdd = sc.textFile("/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-shell")③
    val wordcount = rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)④
    val wordsort = wordcount.map(x=>(x._2,x._1))
      .sortByKey(false).map(x=>(x._2,x._1))⑤
    wordsort.saveAsTextFile("/tmp/spark")⑥
    sc.stop()⑦
  }
}

main()方法主体中第①和第②行中首先创建一个SparkConf对象来配置应用程序,然后基于这个SparkConf创建了一个SparkContext对象。一旦有了SparkContext,就可以用它来创建RDD,第③行代码中调用了sc.textFile()来创建一个代表文件中各行文本的RDD。第④行中rdd.flatMap(_.split(" ")).map(x=>(x,1))这一段内容的依赖关系是窄依赖,而reduceByKey(_+_)操作对单词进行计数时属于宽依赖。第⑥行中将排序后的结果存储起来。最后第⑦行中使用stop()方法来关闭应用。

在$SPARK_HOME/bin目录中还有一个spark-submit脚本,用于将应用快速部署到Spark集群中。比如这里的WordCount程序,当我们希望通过spark-submit部署时,只需要将应用打包成jar包(即下面示例中的wordcount.jar)并上传到Spark集群中,然后通过spark-submit进行部署,示例如下:

[aaa@qq.com  spark]# bin/spark-submit --class scala.spark.demo.WordCount wordcount.jar --executor-memory 1G --master spark://localhost:7077
2018-08-06 15:39:54 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
2018-08-06 15:39:55 INFO  SparkContext:54 - Running Spark version 2.3.1
2018-08-06 15:39:55 INFO  SparkContext:54 - Submitted application: WordCount
2018-08-06 15:39:55 INFO  SecurityManager:54 - Changing view acls to: root
2018-08-06 15:39:55 INFO  SecurityManager:54 - Changing modify acls to: root
(....省略若干)
2018-08-07 12:25:47 INFO  AbstractConnector:318 - Stopped 
aaa@qq.com{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-08-07 12:25:47 INFO  SparkUI:54 - Stopped Spark web UI at 
http://10.199.172.111:4040
2018-08-07 12:25:47 INFO  MapOutputTrackerMasterEndpoint:54 – 
MapOutputTrackerMasterEndpoint stopped!
2018-08-07 12:25:47 INFO  MemoryStore:54 - MemoryStore cleared
2018-08-07 12:25:47 INFO  BlockManager:54 - BlockManager stopped
2018-08-07 12:25:47 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-08-07 12:25:47 INFO  
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 – 
OutputCommitCoordinator stopped!
2018-08-06 15:46:57 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-08-06 15:46:57 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-08-06 15:46:57 INFO  ShutdownHookManager:54 - Deleting directory 
/tmp/spark-fa955139-270c-4899-82b7-4959983a1cb0
2018-08-06 15:46:57 INFO  ShutdownHookManager:54 - Deleting directory 
/tmp/spark-3f359966-2167-4bb9-863a-2d8a8d5e8fbe

示例中的–class用来指定应用程序的主类,这里为scala.spark.demo.WordCount;–executor-memory用来指定执行器节点的内容,这里设置为1G。最后得到的输出结果如下所示:

[aaa@qq.com spark]# ls /tmp/spark
part-00000  _SUCCESS
[aaa@qq.com spark]# cat /tmp/spark/part-00000 
(,91)
(#,37)
(the,19)
(in,7)
(to,7)
(for,6)
(if,5)
(then,5)
(under,4)
(stty,4)
(not,4)

欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。
[Kafka与Spark集成系列三] Spark编程模型