欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark学习笔记(一)——基础和架构

程序员文章站 2022-07-15 17:35:06
...

Spark基础和架构

一、Spark和Hadoop的比较

  • MapReduce编程模型的局限性
    • 繁杂
      • 只有Map和Reduce两个操作,复杂的逻辑需要大量的样板代码
    • 处理效率低:
      • Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据
      • 任务调度与启动开销大
    • 不适合迭代处理、交互式处理和流式处理
  • Spark是类Hadoop MapReduce的通用并行框架
    • Job中间输出结果可以保存在内存,不再需要读写HDFS
    • 比MapReduce平均快10倍以上

二、Spark优势

  • 速度快
    • 基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试)
    • 基于硬盘数据处理,比MR快10个数量级以上
  • 易用性
    • 支持Java、Scala、Python、R语言
    • 交互式shell方便开发测试
  • 通用性
    • 一栈式解决方案:批处理、交互式查询、实时流处理、图计算及机器学习
  • 多种运行模式
    • YARN、Mesos、EC2、Kubernetes、Standalone、Local

三、Spark技术栈

  • Spark Core

    • 核心组件,分布式计算引擎

    包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的

  • Spark SQL

    • 高性能的基于Hadoop的SQL解决方案

    提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作

  • Spark Streaming

    • 可以实现高吞吐量、具备容错机制的准实时流处理系统

    对实时数据流进行处理和控制。Spark Streaming允许程序能够像普通RDD一样处理实时数据

  • Spark GraphX

    • 分布式图处理框架

    控制图、并行图操作和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作

  • Spark MLlib

    • 构建在Spark上的分布式机器学习库

    一个常用机器学习算法库,算法被实现为对RDD的Spark操作。这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0EEqGelX-1597058270299)(…/…/…/software/typora/img/image-20200804091326775.png)]

四、Spark环境部署

1、安装配置

  • 选择Spark2.2版本,下载地址
    • https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
  • 解压并配置环境变量
    • SPARK_HOME
  • Spark配置文件
    • $SPARK_HOME/conf/spark-env.sh
export SPARK_MASTER_HOST=192.168.253.150
	//本机的ip地址
export SPARK_MASTER_PORT=7077
	
export SPARK_WORKER_CORES=2
	//2个核心 不能比分配的虚拟机核数大
export SPARK_WORKER_MEMORY=3g

export SPARK_MASTER_WEBUI_PORT=8888
  • $SPARK_HOME/conf/slaves
localhost
    //由于是单机所以是本地
  • 启动Spark
    • 先进入spark的sbin目录 ./start-all.sh启动Master和Worker任务
    • 进入bin目录./spark-shell进入shell界面

2、测试

  • spark-shell:Spark自带的交互式工具
    • 本机
      • spark-shell --master local[*]
    • Standalone(需启动spark)
      • spark-shell --master spark://MASTERHOST:7077
    • YARN
      • spark-shell --master yarn
scala> sc.textFile("file:///opt/data/Spark/spark.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((hive,1), ("",1), (hello,4), (world,1), (spark,1), (hadoop,1))

五、Spark架构设计

1、运行架构

Spark学习笔记(一)——基础和架构

  • 在驱动程序中,通过SparkContext主导应用的执行
  • SparkContext可以连接不同类型的Cluster Manager(Standalone、YARN、Mesos),连接后,获得集群节点上的Executor
  • 一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整
  • 每个应用获取自己的Executor
  • 每个Task处理一个RDD分区

2、运行基本流程

Spark学习笔记(一)——基础和架构

1、当一个spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext对象,由Sparkcontext负责和资源管理器的通信以及进行资源的申请、任务的分配和监控等,SparkContext会向资源管理器注册并申请运行Executor的资源,SparkContext可以看成是应用程序连接集群的通道。

2、资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着"心跳"发送到资源管理器上。

3、SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器进行解析,将DAG图分解成多个‘阶段’(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器进行处理;Executor向SparkContext申请任务,任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码发放给Executor。

4、任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。+

3、核心组件

术语 说 明
Application 建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码
Driver program 驱动程序。Application中的main函数并创建SparkContext
Cluster Manager 在集群(Standalone、Mesos、YARN)上获取资源的外部服务
Worker Node 集群中任何可以运行Application代码的节点
Executor 某个Application运行在worker节点上的一个进程
Job 包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job
Stage 每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage
Task 被送到某个Executor上的工作单元

六、Spark API

1、SparkContext

  • 连接Driver与Spark Cluster(Workers)
  • Spark的主入口
  • 每个JVM仅能有一个活跃的SparkContext
  • SparkContext.getOrCreate
object SparkContextDemo extends App {
  //创建一个spark.context对象
  val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("spark")
  val sc: SparkContext = SparkContext.getOrCreate(conf)
  sc.textFile("D:\\study files\\Spark\\test\\spark.txt")
    .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    .collect.foreach(println)
  sc.stop()
}

2、SparkSession

  • Spark 2.0+应用程序的主入口:包含了SparkContext、SQLContext、HiveContext以及StreamingContext
  • SparkSession.getOrCreate
object SparkSessionDemo extends App {
  //创建一个SparkSession对象
  val spark: SparkSession = SparkSession.builder()
    .master("local[2]").appName("SparkSessionTest").getOrCreate()
  val sc: SparkContext = spark.sparkContext
  sc.textFile("D:\\study files\\Spark\\test\\spark.txt")
    .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    .collect.foreach(println)
  sc.stop()
}

3、RDD核心

  • RDD
    • Spark核心,主要数据抽象
  • Dataset
    • 从Spark1.6开始引入的新的抽象,特定领域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作
  • DataFrame
    • DataFrame是特殊的Dataset

七、RDD

1、RDD概念

  • RDD是弹性分布式数据集(Resilient Distributed Datasets)
    • 分布式( Distributed)
      • 数据的计算并非只局限于单个节点,而是多个节点之间协同计算得到
    • 数据集( Datasets)
      • RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上
      • RDD并不存储真正的数据,只是对数据和操作的描述
    • 弹性(Resilient)
      • 自动进行存储方式的切换,RDD优先存储内存中,内存不足将自动写入磁盘
      • 基于Linage的高效容错机制,在任何时候都能进行重算,根据数据血统,可以自动从节点失败中恢复分区,各个分片之间的数据互不影响
      • Stage失败自动重试 / Task失败自动重试
      • Checkpoint和Persist, checkpoint持久化到文件系统

2、五大特性

  • 一系列的分区(分片)信息,每个任务处理一个分区
  • 每个分区上都有compute函数,计算该分区中的数据
  • RDD之间有一系列的依赖
  • 分区器决定数据(key-value)分配至哪个分区
  • 优先位置列表,将计算任务分派到其所在处理数据块的存储位置(移动数据不如移动计算)

Spark学习笔记(一)——基础和架构

3、RDD与DAG

  • 两者是Spark提供的核心抽象
  • DAG(有向无环图)反映了RDD之间的依赖关系

Spark学习笔记(一)——基础和架构

4、RDD编程流程

Spark学习笔记(一)——基础和架构

5、RDD的创建

(1)使用集合创建RDD

val rdd1:RDD[Int] =sc.parallelize(List(1,2,3,4,5,6,7,8))
println(rdd1.partitions.size)
val rdd2:RDD[Int] =sc.parallelize(List(1,2,3,4,5,6,7,8),6)
println(rdd2.partitions.size)
val rdd3: RDD[Int] = sc.makeRDD(List(1,2,3,4,5),10)
println(rdd3.partitions.size)
    //分区数量
    12
	6
	10

1、Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定
2、Spark会为每一个分区运行一个任务进行处理

(2)通过加载文件产生RDD

object createRDDDemo extends App {
  //创建一个spark.context对象
  val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
  val sc: SparkContext = SparkContext.getOrCreate(conf)
  val rdd1: RDD[String] = sc.textFile("file:///D:\\study files\\Spark\\project\\Spark_beijing\\src\\data\\hello.txt")

  rdd1.collect().foreach(println)
  sc.stop()

}
  • 加载“file://……”时,以local运行仅需一份本地文件,以Spark集群方式运行,应保证每个节点均有该文件的本地副本

  • 支持目录、压缩文件以及通配符

sc.textFile("/my/directory")
sc.textFile("/my/directory/*.txt")
sc.textFile("/my/directory/*.gz")

1、Spark默认访问HDFS
2、Spark默认为HDFS文件的每一个数据块创建一个分区,也可以通过textFile()第二个参数指定,但只能比数据块数量多

(3)创建PairRDD的方法

  • SparkContext.wholeTextFiles():可以针对一个目录中的大量小文件返回<filename,fileContent>作为PairRDD
    • 普通RDD:org.apache.spark.rdd.RDD[data_type]
    • PairRDD:org.apache.spark.rdd.RDD[(key_type,value_type)]
object CreatePairRDDDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[*]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val pairRDD =sc.wholeTextFiles("file:///D:\\study files\\Spark\\project\\Spark_beijing\\src\\data")
  pairRDD.foreach(println)
  sc.stop()
}

//输出结果

(4)其他创建RDD的方法

  • SparkContext.sequenceFile[K,V]()
    • Hadoop SequenceFile的读写支持
  • SparkContext.hadoopRDD()、newAPIHadoopRDD()
    • 从Hadoop接口API创建
  • SparkContext.objectFile()
    • RDD.saveAsObjectFile()的逆操作

八、RDD操作

详情可参考官方中文文档:http://spark.apachecn.org/#/docs/4

1、RDD分区

  • 分区是被拆分并发送到节点的RDD的不同块之一
    • 我们拥有的分区越多,得到的并行性就越强
    • 每个分区都是被分发到不同Worker Node的候选者
    • 每个分区对应一个Task
      Spark学习笔记(一)——基础和架构

分区的作用:

一个HDFS文件的RDD将文件的每个文件块表示为一个分区,并且知道每个文件块的位置信息。这些对应着数据块的分区分布到集群的节点中,因此,分区的多少涉及对这个RDD进行并行计算的粒度。首先,分区是一个逻辑概念, 变换前后的新旧分区在物理上可能是同一块内存或者是存储。

需要注意的是,如果没有指定分区数将使用默认值,而默认值是该程序所分配到CPU核数,如果是从HDFS文件创建,默认为文件的数据块数。

2、RDD的操作算子

  • 分为lazy与non-lazy两种
    • Transformation(lazy):也称转换操作、转换算子
    • Actions(non-lazy):立即执行,也称动作操作、动作算子

常用Transformation算子:

Transformation算子 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
coalesce(numPartitions) 减少 RDD 的分区数到指定值。
repartition(numPartitions) 重新给 RDD 分区
repartitionAndSortWithinPartitions(partitioner) 重新给 RDD 分区,并且每个分区内以记录的 key 排序

常用Action算子

Action算子 含义
reduce(func) reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func
foreachPartition(func) 在数据集的每一个分区上,运行函数func

3、RDD转换算子

  • 对于转换操作,RDD的所有转换都不会直接计算结果
    • 仅记录作用于RDD上的操作
    • 当遇到动作算子(Action)时才会进行真正计算

(1)map算子

  • 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
  • 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应
  • 输入分区与输出分区一一对应
object MapDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[*]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val a = sc.parallelize(1 to 5)
  val b = a.map(x=>x*2)
  a.collect().foreach(println)
  println("----------------------")
  b.collect().foreach(println)
  println("----------------------")
  //TODO:使用map将普通RDD转换成pairRDD
  val c = a.map(x=>(x,1))
  println(c.collect().mkString("\t"))
}

//输出结果
1
2
3
4
5
----------------------
2
4
6
8
10
----------------------
(1,1)	(2,1)	(3,1)	(4,1)	(5,1)

(2)filter算子

  • 对元素进行过滤,对每个元素应用指定函数,返回值为true的元素保留在新的RDD中
object FilterRDD extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[*]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val a = sc.parallelize(1 to 5)
  val b = a.filter(x=>x%2==0)
  println(b.collect().mkString("\t"))

}

//输出结果
2	4

(3)mapValue算子

  • 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD
object MapValueDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[*]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)
  val a:RDD[String] = sc.parallelize(List("dog","cat","panda"))

  //TODO:使用map将普通RDD转换成pairRDD
  val pairRDD: RDD[(Int, String)] = a.map(x=>(x.length,x))
  println(pairRDD.collect().mkString("\t"))
  //TODO:mapValues对pairRDD的value进行处理
  val d: RDD[(Int, String)] = pairRDD.mapValues(x=>x+" say:"+"hungry")
  println(d.collect().mkString("\t"))
}

//输出结果
(3,dog say:hungry)	(3,cat say:hungry)	(5,panda say:hungry)

(4)FlatMap算子

  • 类似于map,但是每一个输入元素可以被映射为0或多个输出元素
object FlatMapDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[*]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  private val rdd1: RDD[List[Int]] = sc.parallelize(List(List(1,2,3),List(4,5,6),List(7,8,9)))
  private val rdd2: RDD[Char] = rdd1.flatMap(x=>x.mkString(""))
  println(rdd2.collect().mkString(""))
}
//输出结果
123456789

(5)MapPartitionsWithIndex算子

object MapPartitionsWithIndexDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val a = sc.parallelize(1 to 5)

  //List(1,2)
  val rdd: RDD[(Int, Int)] = a.mapPartitionsWithIndex((index, items)=>(items.map(x=>(index,x))))
  println(rdd.collect().mkString("\t"))

}

//输出结果
(0,1)	(1,2)	(2,3)	(3,4)	(4,5)

(6)更多的转换算子

  • distinct
  • reduceByKey
object ReduceByKeyDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val rdd1 = sc.parallelize(List("dog","cat","pig","salmon"))
  private val rdd2: RDD[(Int, String)] = rdd1.map(x=>(x.length,x))
  println(rdd2.collect().mkString(" "))
  //TODO:reduceByKey
  private val rdd3: RDD[(Int, String)] = rdd2.reduceByKey((a, b)=>a+b)
  println(rdd3.collect().mkString(" "))
  //TODO:groupByKey
  private val rdd4: RDD[(Int, Iterable[String])] = rdd2.groupByKey()
  println(rdd4.collect().mkString(" "))
  //TODO:sortByKey
  println("----------------------------------------")
  println(rdd2.sortByKey().collect().mkString(" "))
  println(rdd2.sortByKey(false).collect().mkString(" "))
}

//输出结果
(3,dog) (3,cat) (3,pig) (6,salmon)
(6,salmon) (3,dogcatpig)
(6,CompactBuffer(salmon)) (3,CompactBuffer(dog, cat, pig))
----------------------------------------
(3,dog) (3,cat) (3,pig) (6,salmon)
(6,salmon) (3,dog) (3,cat) (3,pig)
  • groupByKey
  • sortByKey
object SortByDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val a = sc.parallelize(List(10,2,3,5,13,6,3))
  println(a.sortBy(x => x).collect().mkString(" "))
  println(a.sortBy(x => x * (-1)).collect().mkString(" "))
  
}

//输出结果
2 3 3 5 6 10 13
13 10 6 5 3 3 2
  • union
object UnionDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val a = sc.parallelize(1 to 10)
  val b = sc.parallelize(4 to 15)
  val unionRDD: RDD[Int] = a.union(b)
  println(unionRDD.collect().mkString(" "))
}
//输出结果
1 2 3 4 5 6 7 8 9 10 4 5 6 7 8 9 10 11 12 13 14 15
  • join和cogroup
object JoinDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  //创建两个RDD
  val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
  val rdd2 = sc.parallelize(Array((1,4),(2,5),(3,6)))

  //TODO:进行join操作 在类型为(K,V)和(K,W)的RDD上调用
  //TODO:返回一个形同额key对应的所有元素在一起的(K,(V,W))的调用
  val rdd3 = rdd1.join(rdd2)
  println(rdd3.collect().mkString(" "))
}

//输出结果
(1,(a,4)) (2,(b,5)) (3,(c,6))
object CogroupDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  //创建两个RDD
  val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
  val rdd2 = sc.parallelize(Array((1,4),(2,5),(3,6)))

  //TODO:进行cogroup操作 在类型为(K,V)和(K,W)的RDD上调用
  //TODO:返回一个形同额key对应的所有元素在一起的(K,(V,W))的调用
  val rdd3 = rdd1.cogroup(rdd2)
  println(rdd3.collect().mkString(" "))

}

//输出结果
(1,(CompactBuffer(a),CompactBuffer(4))) (2,(CompactBuffer(b),CompactBuffer(5))) (3,(CompactBuffer(c),CompactBuffer(6)))
  • count
  • sample
object SampleDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val a = sc.parallelize(1 to 10)
  a.sample(true,0.4,2).collect().foreach(println)
}
//输出结果
1
2
2
6
6
7
8
9
9
10
  • repartition和coalesce
object RePartitionDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  val a:RDD[Int]  = sc.parallelize(1 to 10,4)
  //TODO:  repartition重新分区
  private val rdd: RDD[Int] = a.repartition(2)
  println(a.partitions.size)
  println(rdd.partitions.size)
  //TODO:  coalesce 减少到2个分区 可以指定是否发生shuffle
  private val rdd1: RDD[Int] = a.coalesce(2)
  println(rdd1.partitions.size)
}

//输出结果
4
2
2

同时也可以这样改变分区:

object PartitionDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  //创建pairRDD
  val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
  //TODO:进行partition操作 对pariRdd进行分区操作
  //TODO:如果原有的pairRDD和现有的partionRDD是一致的话就不进行分区
  //TODO:否则会生成shufflerdd会产生shuffle过程
  val rdd2=rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
  println(rdd2.partitions.size)
  sc.stop()
}

//输出结果
2
  • zip
object ZipDemo extends App {
  //创建一个spark context对象
  val conf = new SparkConf().setMaster("local[5]").setAppName("sparkTest6")
  val sc = SparkContext.getOrCreate(conf)

  //创建两个rDD
  val rdd1 = sc.parallelize(Array(1,2,3,4,5))
  val rdd2 = sc.parallelize(Array("A","B","C","D","E"))
  //todo:将两个RDD组合成keyvalue的形式的rdd
  //todo:这里需要两个数量相同否则会跑出异常
  val rdd3 = rdd1.zip(rdd2)
  println(rdd3.collect().mkString(" "))
  
  //元素数量不同会报错
  val rdd4 = sc.parallelize(Array("A","B","C","D"))
  val rdd5 = rdd1.zip(rdd4)
  println(rdd5.collect().mkString(" "))

}

4、RDD动作算子

  • 本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行
  • 所有的动作算子都是急迫型(non-lazy),RDD遇到Action就会立即计算

(1)count

返回的是数据集中的元素的个数

val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count
//6

(2)collect

以Array返回RDD的所有元素。一般在过滤或者处理足够小的结果的时候使用

val rdd=sc.parallelize(List(1,2,3,4,5,6))
println(rdd.collect.mkString(" "))
// 1 2 3 4 5 6

(3)take

返回前n个元素

val rdd=sc.parallelize(List(1,2,3,4,5,6))
println(rdd.take(3).mkString(" "))
    //1 2 3

(4)first

返回RDD第一个元素

val rdd=sc.parallelize(List(1,2,3,4,5,6))
println(rdd.first())
    //1

(5)reduce

根据指定函数,对RDD中的元素进行两两计算,返回计算结果

val a=sc.parallelize(1 to 100)
a.reduce((x,y)=>x+y)
a.reduce(_+_)		//与上面等价
val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1)))
b.reduce((x,y)=>{(x._1+y._1,x._2+y._2)})		//(AABBC,6)

(6)foreach

对RDD中的每个元素都使用指定函数,无返回值

val rdd=sc.parallelize(1 to 100)
rdd.foreach(println)

(7)lookup

用于PairRDD,返回K对应的所有V值

val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
println(rdd.lookup('a'))
    //WrappedArray(1, 2)

(8)最值

max、min返回最大值、最小值

val c:RDD[Int] = sc.parallelize(1 to 50)
println(c.max())
println(c.min())
    //50
	//1

(9)saveAsTextFile

保存RDD数据至文件系统

val d:RDD[Int] = sc.parallelize(1 to 10)
d.saveAsTextFile("src/data/test")

定函数,对RDD中的元素进行两两计算,返回计算结果

val a=sc.parallelize(1 to 100)
a.reduce((x,y)=>x+y)
a.reduce(_+_)		//与上面等价
val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1)))
b.reduce((x,y)=>{(x._1+y._1,x._2+y._2)})		//(AABBC,6)

(6)foreach

对RDD中的每个元素都使用指定函数,无返回值

val rdd=sc.parallelize(1 to 100)
rdd.foreach(println)

(7)lookup

用于PairRDD,返回K对应的所有V值

val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
println(rdd.lookup('a'))
    //WrappedArray(1, 2)

(8)最值

max、min返回最大值、最小值

val c:RDD[Int] = sc.parallelize(1 to 50)
println(c.max())
println(c.min())
    //50
	//1

(9)saveAsTextFile

保存RDD数据至文件系统

val d:RDD[Int] = sc.parallelize(1 to 10)
d.saveAsTextFile("src/data/test")