欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark SQL学习笔记

程序员文章站 2022-07-15 17:43:07
...

Spark SQL概述

MapReduce有Hive作为调用接口,可以不用每次都手写MapReduce,而是让Hive自动生成MapReduce代码自己执行

那么Spark框架就有类似的产品,Shark(完全照搬Hive的设计

Shark两个缺点

  • 因为完全照搬Hive,Hive是针对MapRudece开发的,所以Shark照搬了Hive就不好对Spark进行新增优化策略
  • Spark线程级运行,MapReduce进程级运行,这样会产生线程安全问题,需要打补丁

然后就出现了Spark SQL
Spark SQL学习笔记

然后有了新的数据类型,DataFrame,支持SQL查询—之前Spark对RDD进行操作
Spark SQL学习笔记

Spark SQL学习笔记

RDD和DataFrame的区别
Spark SQL学习笔记
Spark SQL学习笔记

Spark SQL学习笔记

DataFrame创建

Spark Shell 会自动帮你创建sc对象和SparkSession对象
可以在Spark SQL读写数据的操作中看到,我们启动spark-shell的时候,会输出下面两条语句

Spark context available as 'sc' (master = local[*], app id = local-1575509509014).
Spark session available as 'spark'.
## 终端1
scala> val peopleDF = spark.read.format("json").load("file:///usr/local/soft/spark-2.1.0-bin-without-hadoop/examples/src/main/resources/people.json")
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.select("name","age").write.format("csv").save("file:///root/newpeople.csv")

## 终端2
[aaa@qq.com ~]# cd newpeople.csv/
[aaa@qq.com newpeople.csv]# ls
part-00000-3eb8b2ce-61c2-4dc8-9faf-302d0ee00fb0.csv  _SUCCESS
[aaa@qq.com newpeople.csv]# cat *.csv
Michael,
Andy,30
Justin,19

RDD转化成DataFrame

两种方法
Spark SQL学习笔记

直接看教材了
Spark2.1.0入门:从RDD转换得到DataFrame

Spark SQL读写数据

下载mysql-connector-java-5.1.40.tar.gz
我之前有拷贝,所有就没有下载了

########## 终端1 ############
[aaa@qq.com jars]# spark-shell --jars /usr/local/soft/spark-2.1.0-bin-without-hadoop/jars/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/soft/spark-2.1.0-bin-without-hadoop/jars/mysql-connector-java-5.1.40-bin.jar
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/12/04 20:31:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.0.11:4040
Spark context available as 'sc' (master = local[*], app id = local-1575509509014).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "root").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> jdbcDF.show()
+---+--------+------+---+
| id|    name|gender|age|
+---+--------+------+---+
|  1| Xueqian|     F| 23|
|  2|Weiliang|     M| 24|
+---+--------+------+---+


scala> import java.util.Properties
import java.util.Properties

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(gender,StringType,true), StructField(age,IntegerType,true))

scala> val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
studentRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:28

scala> val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[5] at map at <console>:30

scala> val studentDF = spark.createDataFrame(rowRDD, schema)
studentDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> val prop = new Properties()
prop: java.util.Properties = {}

scala> prop.put("user", "root") //表示用户名是root
res1: Object = null

scala> prop.put("password", "root") //表示密码是hadooroot
res2: Object = null

scala> prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
res3: Object = null

scala> studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop)

scala> jdbcDF.show()
+---+---------+------+---+
| id|     name|gender|age|
+---+---------+------+---+
|  1|  Xueqian|     F| 23|
|  2| Weiliang|     M| 24|
|  3|Rongcheng|     M| 26|
|  4|  Guanhua|     M| 27|
+---+---------+------+---+


######### 终端2 ##########
mysql> select * from student;
+------+-----------+--------+------+
| id   | name      | gender | age  |
+------+-----------+--------+------+
|    1 | Xueqian   | F      |   23 |
|    2 | Weiliang  | M      |   24 |
|    3 | Rongcheng | M      |   26 |
|    4 | Guanhua   | M      |   27 |
+------+-----------+--------+------+
4 rows in set (0.00 sec)