欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark SQL常见4种数据源详解

程序员文章站 2023-11-26 12:40:28
通用load/write方法 手动指定选项 spark sql的dataframe接口支持多种数据源的操作。一个dataframe可以进行rdds方式的操作,也...

通用load/write方法

手动指定选项

spark sql的dataframe接口支持多种数据源的操作。一个dataframe可以进行rdds方式的操作,也可以被注册为临时表。把dataframe注册为临时表之后,就可以对该dataframe执行sql查询。

spark sql的默认数据源为parquet格式。数据源为parquet文件时,spark sql可以方便的执行所有的操作。

修改配置项spark.sql.sources.default,可修改默认数据源格式。

scala> val df = spark.read.load("hdfs://hadoop001:9000/namesandages.parquet")
df: org.apache.spark.sql.dataframe = [age: bigint, name: string]
scala> df.select("name").write.save("names.parquet")

当数据源格式不是parquet格式文件时,需要手动指定数据源的格式。数据源格式需要指定全名(例如:org.apache.spark.sql.parquet),如果数据源格式为内置格式,则只需要指定简称json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。

可以通过sparksession提供的read.load方法用于通用加载数据,使用write和save保存数据。

scala> val peopledf = spark.read.format("json").load("hdfs://hadoop001:9000/people.json")
peopledf: org.apache.spark.sql.dataframe = [age: bigint, name: string]
scala> peopledf.write.format("parquet").save("hdfs://hadoop001:9000/namesandages.parquet")
scala>

除此之外,可以直接运行sql在文件上:

val sqldf = spark.sql("select * from parquet.`hdfs://hadoop001:9000/namesandages.parquet`")
sqldf.show()

文件保存选项

可以采用savemode执行存储操作,savemode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用overwrite方式执行时,在输出新数据之前原数据就已经被删除。savemode详细介绍如下表:

scala/java any language meaning
savemode.errorifexists(default) “error”(default) 如果文件存在,则报错
savemode.append “append” 追加
savemode.overwrite “overwrite” 覆写
savemode.ignore “ignore” 数据存在,则忽略

parquet文件

parquet读写

parquet格式经常在hadoop生态圈中被使用,它也支持spark sql的全部数据类型。spark sql 提供了直接读取和存储 parquet 格式文件的方法。

// encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopledf = spark.read.json("examples/src/main/resources/people.json")
// dataframes can be saved as parquet files, maintaining the schema information
peopledf.write.parquet("hdfs://hadoop001:9000/people.parquet")
// read in the parquet file created above
// parquet files are self-describing so the schema is preserved
// the result of loading a parquet file is also a dataframe
val parquetfiledf = spark.read.parquet("hdfs://hadoop001:9000/people.parquet")
// parquet files can also be used to create a temporary view and then used in sql statements
parquetfiledf.createorreplacetempview("parquetfile")
val namesdf = spark.sql("select name from parquetfile where age between 13 and 19")
namesdf.map(attributes => "name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |name: justin|
// +------------+

解析分区信息

对表进行分区是对数据进行优化的方式之一。在分区的表内,数据通过分区列将数据存储在不同的目录下。parquet数据源现在能够自动发现并解析分区信息。例如,对人口数据进行分区存储,分区列为gender和country,使用下面的目录结构:

path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=us
│ │ └── data.parquet
│ ├── country=cn
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=us
│ └── data.parquet
├── country=cn
│ └── data.parquet
└── ...

通过传递path/to/table给 sqlcontext.read.parque

或sqlcontext.read.load,spark sql将自动解析分区信息。

返回的dataframe的schema如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

需要注意的是,数据的分区列的数据类型是自动解析的。当前,支持数值类型和字符串类型。自动解析分区类型的参数为:

spark.sql.sources.partitioncolumntypeinference.enabled

默认值为true。

如果想关闭该功能,直接将该参数设置为disabled。此时,分区列数据格式将被默认设置为string类型,不再进行类型解析。

schema合并

像protocolbuffer、avro和thrift那样,parquet也支持schema evolution(schema演变)。用户可以先定义一个简单的schema,然后逐渐的向schema中增加列描述。通过这种方式,用户可以获取多个有不同schema但相互兼容的parquet文件。现在parquet数据源能自动检测这种情况,并合并这些文件的schemas。

因为schema合并是一个高消耗的操作,在大多数情况下并不需要,所以spark sql从1.5.0开始默认关闭了该功能。可以通过下面两种方式开启该功能:

当数据源为parquet文件时,将数据源选项mergeschema设置为true。

设置全局sql选项:

spark.sql.parquet.mergeschema为true。

// sqlcontext from the previous example is used in this example.
// this is used to implicitly convert an rdd to a dataframe.
import spark.implicits._
// create a simple dataframe, stored into a partition directory
val df1 = sc.makerdd(1 to 5).map(i => (i, i * 2)).todf("single", "double")
df1.write.parquet("hdfs://hadoop001:9000/data/test_table/key=1")
// create another dataframe in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makerdd(6 to 10).map(i => (i, i * 3)).todf("single", "triple")
df2.write.parquet("hdfs://hadoop001:9000/data/test_table/key=2")
// read the partitioned table
val df3 = spark.read.option("mergeschema", "true").parquet("hdfs://hadoop001:9000/data/test_table")
df3.printschema()
// the final schema consists of all 3 columns in the parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

hive数据源

apache hive是hadoop上的sql引擎,spark sql编译时可以包含hive支持,也可以不包含。包含hive支持的spark sql可以支持hive表访问、udf(用户自定义函数)以及 hive 查询语言(hiveql/hql)等。需要强调的 一点是,如果要在spark sql中包含hive的库,并不需要事先安装hive。一般来说,最好还是在编译spark sql时引入hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 spark,它应该已经在编译时添加了 hive 支持。

若要把spark sql连接到一个部署好的hive上,你必须把hive-site.xml复制到 spark的配置文件目录中($spark_home/conf)。即使没有部署好hive,spark sql也可以运行。

需要注意的是,如果你没有部署好hive,spark sql会在当前的工作目录中创建出自己的hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 hiveql 中的 create table (并非 create external table)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 hdfs,否则就是本地文件系统)。

import java.io.file
import org.apache.spark.sql.row
import org.apache.spark.sql.sparksession
case class record(key: int, value: string)
// warehouselocation points to the default location for managed databases and tables
val warehouselocation = new file("spark-warehouse").getabsolutepath
val spark = sparksession
.builder()
.appname("spark hive example")
.config("spark.sql.warehouse.dir", warehouselocation)
.enablehivesupport()
.getorcreate()
import spark.implicits._
import spark.sql
sql("create table if not exists src (key int, value string)")
sql("load data local inpath 'examples/src/main/resources/kv1.txt' into table src")
// queries are expressed in hiveql
sql("select * from src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// aggregation queries are also supported.
sql("select count(*) from src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// the results of sql queries are themselves dataframes and support all normal functions.
val sqldf = sql("select key, value from src where key < 10 order by key")
// the items in dataframes are of type row, which allows you to access each column by ordinal.
val stringsds = sqldf.map {
case row(key: int, value: string) => s"key: $key, value: $value"
}
stringsds.show()
// +--------------------+
// | value|
// +--------------------+
// |key: 0, value: val_0|
// |key: 0, value: val_0|
// |key: 0, value: val_0|
// ...
// you can also use dataframes to create temporary views within a sparksession.
val recordsdf = spark.createdataframe((1 to 100).map(i => record(i, s"val_$i")))
recordsdf.createorreplacetempview("records")
// queries can then join dataframe data with data stored in hive.
sql("select * from records r join src s on r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...

内嵌hive应用

如果要使用内嵌的hive,什么都不用做,直接用就可以了。 –conf :

spark.sql.warehouse.dir=

注意:如果你使用的是内部的hive,在spark2.0之后,spark.sql.warehouse.dir用于指定数据仓库的地址,如果你需要是用hdfs作为路径,那么需要将core-site.xml和hdfs-site.xml 加入到spark conf目录,否则只会创建master节点上的warehouse目录,查询时会出现文件找不到的问题,这是需要向使用hdfs,则需要将metastore删除,重启集群。

外部hive应用

如果想连接外部已经部署好的hive,需要通过以下几个步骤。

a 将hive中的hive-site.xml拷贝或者软连接到spark安装目录下的conf目录下。

b 打开spark shell,注意带*问hive元数据库的jdbc客户端。

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar

json数据集

spark sql 能够自动推测 json数据集的结构,并将它加载为一个dataset[row]. 可以通过sparksession.read.json()去加载一个 dataset[string]或者一个json 文件.注意,这个json文件不是一个传统的json文件,每一行都得是一个json串。

{"name":"michael"}
{"name":"andy", "age":30}
{"name":"justin", "age":19}
// primitive types (int, string, etc) and product types (case classes) encoders are
// supported by importing this when creating a dataset.
import spark.implicits._
// a json dataset is pointed to by path.
// the path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopledf = spark.read.json(path)
// the inferred schema can be visualized using the printschema() method
peopledf.printschema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// creates a temporary view using the dataframe
peopledf.createorreplacetempview("people")
// sql statements can be run by using the sql methods provided by spark
val teenagernamesdf = spark.sql("select name from people where age between 13 and 19")
teenagernamesdf.show()
// +------+
// | name|
// +------+
// |justin|
// +------+
// alternatively, a dataframe can be created for a json dataset represented by
// a dataset[string] storing one json object per string
val otherpeopledataset = spark.createdataset(
"""{"name":"yin","address":{"city":"columbus","state":"ohio"}}""" :: nil)
val otherpeople = spark.read.json(otherpeopledataset)
otherpeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[columbus,ohio]| yin|
// +---------------+----+

jdbc

spark sql可以通过jdbc从关系型数据库中读取数据的方式创建dataframe,通过对dataframe一系列的计算后,还可以将数据再写回关系型数据库中。

注意,需要将相关的数据库驱动放到spark的类路径下。

$ bin/spark-shell --master spark://hadoop001:7077 --jars mysql-connector-java-5.1.27-bin.jar
// note: jdbc loading and saving can be achieved via either the load/save or jdbc methods
// loading data from a jdbc source
val jdbcdf = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/rdd").option("dbtable", " rddtable").option("user", "root").option("password", "hive").load()
val connectionproperties = new properties()
connectionproperties.put("user", "root")
connectionproperties.put("password", "hive")
val jdbcdf2 = spark.read
.jdbc("jdbc:mysql://hadoop001:3306/rdd", "rddtable", connectionproperties)
// saving data to a jdbc source
jdbcdf.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop001:3306/rdd")
.option("dbtable", "rddtable2")
.option("user", "root")
.option("password", "hive")
.save()
jdbcdf2.write
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionproperties)
// specifying create table column data types on write
jdbcdf.write
.option("createtablecolumntypes", "name char(64), comments varchar(1024)")
.jdbc("jdbc:mysql://hadoop001:3306/mysql", "db", connectionproperties)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。