欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark(56) -- SparkMllib -- SparkMllib的功能和应用场景

程序员文章站 2022-07-14 21:52:35
...

1. SparkMllib简介及功能介绍

 MLLIB是Spark的机器学习库。提供了利用Spark构建大规模和易用性的机器学习平台,组件:
(1) ML 算法:包括了分类、回归、聚类、降维、协同过滤
(2) Featurization特征化:特征抽取、特征转换、特征降维、特征选择
(3) Pipelines管道:tools for constructing, evaluating, and tuning ML Pipelines
(4) Persistence持久化:模型的保存、读取、管道操作
(5) Utilities:提供了线性代数、统计学以及数据处理工具

2. SparkML版本变迁

通过官网了解SparkMllib的版本:
spark(56) -- SparkMllib -- SparkMllib的功能和应用场景

  • 基于DataFrame的API是主要API
  • Spark ml基于DataFrame的API
  • Spark mllib基于RDD的API
  • 基于MLlib RDD的API现在处于维护模式。

 从Spark 2.0开始,软件包中基于RDD的API Spark.mllib已进入维护模式。Spark的主要机器学习API现在是包中基于DataFrame的API Spark.ml。究竟两者有什么区别呢?

3. SparkMllib架构详解

MLlib是Spark机器学习库,它是MLBase的一部分,MLBase一共分为一下4部分:

  • ML Optimizer:会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数来处理用户输入的数据,并返回模型或者其他的帮助分析结果。
  • MLLIB是一个进行特征提取的和高级ML编程抽象的算法实现的API平台。
  • MLLIB是Spark实现一些常见的机器学习算法和实用程序。
  • MLRuntime是基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。

 MLlib提供了常见机器学习算法的实现,包括分类、聚类、协同过滤和降维等。使用MLlib来做机器学习工作通常只需要在对原始数据处理之后,然后直接调用相应的API就可以实现了。但是想要选择合适的算法,必须了解算法的原理以及MLlib API。接下来比较下Spark.mllib和spark.ml
 Spark.mllib已经很长时间了,1.0之前的版本已经包含了,提供算法实现都是基于原始的RDD,我们只需要掌握mllib的API就可以完成机器学习工作。但是想要构建完整并且复杂的机器学习流水线是比较困难的,因此有了Spark.ml。
 Spark ML Pipeline从Spark1.2版本开始,目前已经从Alpha阶段毕业,成为可用的并且较为稳定的新的机器学习库。ML Pipeline弥补了MLlib库的不足,向用户提供了一个基于DataFrame的机器学习的工作流式API套件,使用ML Pipeline API我们可以很方便地把数据处理、特征转化、正则化以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。
 从官方文档看,Spark ML Pipeline虽然是被推荐的机器学习方式,但是不会在短期内替代原始的MLlib库,因为MLlib已经包含了丰富稳定的算法实现,并且部分ML Pipeline实现基于MLLib。实际工作中,并不是所有的机器学习过程都需要构建成一个流水线,有时候原始的数据格式整齐且完整,而且使用单一的算法就能实现目标,也没有把事情复杂化,采用最简单且容易理解的方式才是正确的选择。

算法架构如下:
MLLIB主要包含两个部分:
(1)底层基础:主要包括Spark的运行库、矩阵库和向量库。其中向量接口和矩阵接口基于Nelib和BLAS/LAPACK开发的线性代数库Breeze。MLlib支持本地的密集向量和本地向量,并且支持标量向量;同时支持本地矩阵和分布式矩阵,分布式矩阵分为:RowMatrix、IndexedRowMatrix和CoordinateMatrix等。
(2)算法库:包含分类、回归、聚集、协同过滤、梯度下降和特征提取和变换等算法。

从架构图可以看出MLlib主要包含三个部分:
底层基础:包括Spark的运行库、矩阵库和向量库;
算法库:包含广义线性模型、推荐系统、聚类、决策树和评估的算法;
实用程序:包括测试数据的生成、外部数据的读入等功能。
spark(56) -- SparkMllib -- SparkMllib的功能和应用场景
下图是MLlib算法库的核心内容:
spark(56) -- SparkMllib -- SparkMllib的功能和应用场景
 MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。

4. 基于RDD的API与基于DataFrame的API区别和应用

  • 有什么影响?
    • MLlib仍将支持基于RDD的API spark.mllib以及错误修复。
    • MLlib不会为基于RDD的API添加新功能。
    • 在Spark 2.x版本中,MLlib将为基于DataFrame的API添加功能,以实现与基于RDD的API的功能奇偶校验。
    • 基于Dataframe的API在达到功能(粗略估计Spark 2.3)之后,将弃用基于RDD的API。
    • 预计将在Spark 3.0中删除基于RDD的API。
  • 为什么MLlib会切换到基于DataFrame的API?
    • DataFrames提供比RDD更加用户友好的API。DataFrame的许多好处包括Spark数据源,SQL / DataFrame查询,Tungsten和Catalyst优化以及跨语言的统一API。
    • 基于DataFrame的MLlib API跨ML算法和多种语言提供统一的API。
    • DataFrames有助于实用的ML管道,特别是功能转换。有关详细信息。
  • 什么是“Spark ML”?
    • “Spark ML”不是官方名称,但偶尔用于指代基于MLlib DataFrame的API。这主要是由于org.apache.spark.ml基于DataFrame的API使用的Scala包名称,以及我们最初用来强调管道概念的“Spark ML Pipelines”术语。
  • MLlib已被弃用吗?
    • MLlib包括基于RDD的API和基于DataFrame的API。基于RDD的API现在处于维护模式。但是这两种API都没有被弃用。
    • 两套API的介绍

5. SparkMllib的环境搭建与IDEA环境配置

(1)Spark单机版本
 首先在官网下载Spark预编译版本,将lib目录下的spark-assembly-2.2.0-hadoop2.7.2.jar文件复制到IDEA安装目录的lib文件夹下。
 单击IDEA菜单上File选项,选择Project Strcture,在弹出的对话框单击左侧的Libraries,之后单击中部上方绿色+号,添加刚才下载的jar包文件即可。
(2)Maven构建依赖环境
在pom文件中加入mllib包的依赖,保存后IDEA会帮我们自动下载。

<dependency>
    		<groupId>org.apache.spark</groupId>
   	 	<artifactId>spark-mllib_2.11</artifactId>
   	 	<version>${spark.version}</version>
    </dependency>   //代码基于2.2.0语法代码

6. RDD、DataSet、Dataframe区别及转化过程

RDD(Spark1.0)—>DataFrame(Spark1.3)---->DataSet(Spark1.6)

 SparkSql提供了Dataframe和DataSet的数据抽象,DataFrame就是RDD+Schema,可以认为是一张二维表格。它的劣势是在编译器不对表格中的字段进行类型检查。在运行期间检查。DataSet是Spark最新数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame。DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型。
DataFrame可以通过DataSet[Row]方式转换。DataFrame和DataSet都有可控的内存管理机制,所有的数据都保存在非堆上,都使用了catalyst进行sql优化。

首先,回顾下Spark SQL 客户端查询几种方式

  • 通过Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名
  • 通过Spark提供的方法读取JSON文件,将JSON文件转换成DataFrame
  • 通过DataFrame提供的API来操作DataFrame里面的数据。
  • 可以通过将DataFrame注册成为一个临时表的方式,来通过Spark.sql方法运行标准的SQL语句来查询。

关于 DataFrame 查询方式:DataFrame支持两种查询方式一种是DSL风格,另外一种是SQL风格

  • DSL风格:
    需要引入 import spark.implicit._ 这个隐式转换,可以将DataFrame隐式转换成RDD。

  • SQL风格:

    • a. 你需要将DataFrame注册成一张表格,如果你通过CreateTempView这种方式来创建,那么该表格Session有效,如果你通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀 global_temp
    • b. 你需要通过sparkSession.sql 方法来运行你的SQL语句。

DataSet:首先定义一个DataSet,你需要先定义一个Case类。

6.1 RDD、DataSet、DataFrame之间的转换总结:

  • (1)RDD与DataFrame转换:
# 1、 
RDD -> DataFrame : rdd.map(para=>(para(0).trim(),para(1).trim().toInt)).toDF("name","age")
    #通过反射来设置
    rdd.map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()                              
    #通过编程方式来设置Schema,适合于编译期不能确定列的情况
    schemaString.map(fieldName => StructField(fieldName, StringType, nullable = true))  
	val schema = StructType(fields)
    val rdd[Row] = rdd.map(attributes => Row(attributes(0), attributes(1).trim))
    val peopeDF = spark.createDataFrame(rdd[Row],schema)

#2、
 DataFrame -> RDD :  dataFrame.rdd      
 #注意输出:Array([Michael,29], [Andy,30], [Justin,19])
  • (2)RDD与DataSet转换
# 1、 
RDD -> DataSet   :   rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS
# 2、
DataSet -> RDD:dataSet.rdd 
# 输出:Array(Person(Michael,29), Person(Andy,30), Person(Justin,19))

# 1、 
DataFrame -> DataSet:  dataFrame.to[Person]  
# 2、 
DataSet -> DataFrame: dataSet.toDF
  • (3)DataSet与DataFrame转换

对于DataFrame Row对象的访问方式
1、DataFrame = DataSet[Row], DataFrame里面每一行都是Row对象
2、如果需要访问Row对象中的每一个元素,你可以通过下标 row(0);你也可以通过列名 row.getAs[String](“name”)

6.2 各类型转化实战

各个数据的转换的练习,数据集来源于官方example,参考SparkSql官方文档:

import org.apache.spark.sql.SparkSession
case class Person1(name: String, age: Int)
object rdd_df_ds {
  def main(args: Array[String]): Unit = {
    //1-准备环境
    val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
    import spark.implicits._
    val sc = spark.sparkContext
    //2-加载数据
    val peopleRDD = sc.textFile("D:\\BigData\\Workspace\\SparkMllibLesson\\src\\main\\scala\\sparkmllib_part1\\sparkmllib_helloworld\\data\\people.txt")
    //3-RDD的打印
    peopleRDD.foreach(println(_))
    //    Justin, 19
    //    Michael, 29
    //    Andy, 30
    val collectArray = peopleRDD.collect()
    println(collectArray.mkString(""))
    //    Michael, 29Andy, 30Justin, 19
    //4-rdd转化df
    val peopleDF = peopleRDD.map(_.split(",")).map(x => (x(0).trim(), x(1).trim().toInt)).toDF("name", "age")
    peopleDF.show()
    //    +-------+---+
    //    |   name|age|
    //    +-------+---+
    //    |Michael| 29|
    //      |   Andy| 30|
    //      | Justin| 19|
    //      +-------+---+
    //5-df转rdd
    val peoplerdd = peopleDF.rdd
    peoplerdd.foreach(println(_))
    //5-rdd转ds---ds进行类型检查的需要声明样例类
    val PeopleDS = Seq(Person1("Jack", 25)).toDS
    PeopleDS.show()
    //    +----+---+
    //    |name|age|
    //    +----+---+
    //    |Jack| 25|
    //6-ds转rdd
    val rdd2 = PeopleDS.rdd
    rdd2.foreach(println(_)) //Person1(Jack,25)
    //7-ds-df
    PeopleDS.toDF().show()
    //8-df--ds
    peopleDF.as[Person1].show()
    //    +-------+---+
    //    |   name|age|
    //    +-------+---+
    //    |Michael| 29|
    //      |   Andy| 30|
    //      | Justin| 19|
  }
}