欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RDD与共享变量

程序员文章站 2022-06-19 23:37:57
...

RDD和共享变量是Spark中的两个重要抽象。

RDD

弹性分布式数据集, 是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。

RDD的创建

  • 从文件系统中加载数据创建
  • 通过并行集合(数据)创建
val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)
//也可从列表中创建
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)

RDD转换和Action操作

对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录的转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,从血缘关系源头开始,进行物理的转换操作。

  • filter()
  • map()
  • flapMap()
  • groupByKey() 应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集。
  • reduceByKey(lambda) 应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中的每个值是将每个key传递到函数lambda中进行聚合。

终止操作是真正触发计算的地方。Spark程序执行到终止操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。

常见的Action操作有:

  • count() 返回数据集中的元素个数
  • collect() 以数组的形式返回数据集中的所有元素
  • first() 返回数据集中的第一个元素
  • take(n) 以数组的形式返回数据集中的前n个元素
  • reduce(lambda) 通过函数lambda聚合数据集中的元素
  • foreach(lambda) 将数据集中的每一个元素执行lambda操作

RDD的持久化

在Spark中,RDD采用惰性求解的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用一组数据。

可以通过持久化(缓存)机制避免这种重复计算的开销。使用persist()方法对一个RDD标记为持久化。之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发,才会把计算结果进行持久化。

持久化后的RDD将会保留在计算节点的内存中被后面的行动操作重复使用。

persist()持久化级别参数:

  • persist(MEMORY_ONLY):表示将RDD作为反序列化的对象储存于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。
  • persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。
  • 一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)
  • 可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。
val list= List("Hadoop", "Spark", "Hive")
val rdd = sc.parallelize(list)
rdd.cache() //回调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
println(rdd.count) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache()
print(rdd.collection.mkString(",")) //第二次行动操作,不需要从头到尾的计算,只需要重复使用上面缓存中的rdd

RDD分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。为什么要分区?

  • 增加并行读
  • 减少通信开销

在分布式程序中,通信大代价是很大的,因此控制数据分布获得最少的网络传输可以极大地提升整体性能。所以对RDD进行分区的目的就是减少网络传输的代价以提高系统的性能

只有当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。若RDD只需要扫描一次,就没有必要进行分区处理。

能从Spark分区或获取的操作有:cogroup()、groupWith()、join()/leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey()已经lookup()

RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。
对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

  • 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N
  • Apache Mesos:默认的分区数为8
  • Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值

如何设置分区:

  1. 创建RDD时:在调用textFile和parallelize方法时手动指定分区个数即可,语法格式:sc.textFile(path, partitionNum)
  2. 通过转换操作得到新RDD时:直接调用repartition方法即可。
var rdd = data.repartition(4)
rdd2.partitions.size # 4

val array = Array(1,2,3,4,5)
//设置两个分区
val rdd2 = sc,parallelize(array,2) 
  • 对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.defalut.parallelism
  • 对于textFile而言,如果没有在防止指定分区数,则默认为min(defaultParallelism, 2),其中,defaultParallelism对应的就是spark.default.parallelism
  • 如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)

共享变量

当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)。广播变量用来把变量在所有节点的内存之间进行共享。累加器则支持在所有不同节点之间进行累计计算(比如计数或者求和)。

广播变量

广播变量运行程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生产一个副本。

Spark的“行动”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。

可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下:

val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
  • 广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这个节点上。
  • 一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。
import org.apache.spark.SparkConf
import org.apache.spakr.SparkContext

object BroadCastValue {
    val conf = new SparkConf().setAppName("BroadCastValue1").setMaster("local[1]")
    //获取SparkContext
    val sc = new SparkContext(conf)
    //创建广播变量
    val broads = sc.broadcast(3) //变量可以是任意类型
    //创建一个测试的List
    val lists = List(1,2,3,4,5)
    //转换为rdd(并行化)
    val listRDD = sc.parallelize(lists)
    //map操作数据
    val results = listRDD.map(x=>x*broads.value)
    //遍历结果
    results.foreach(x=>println("Thre result is:"+x))
    sc.stop
}

累加器

累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生的支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。

一个数组型的累加器,可以通过调用SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()来创建。运行在集群中的任务,就可以使用add方法来把数值累加到累加器上。但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。

下面是累加器对一个数组元素进行求和的示例:

val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))
accum.value
相关标签: rdd 共享变量