Spark SQL
1.概述
Spark SQL是用于结构化数据处理的Spark模块与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与Spark SQL交互,包括SQL、DataFrame API和DataSets API。当计算结果时,使用相同的执行引擎,而与您用来表示计算的API/语言无关。这种统一意味着开发人员可以很容易地在各种API之间来回切换,基于这些API提供了表示给定转换的最自然的方式
为什么需要SQL
- 事实上的标准
- 易学易用
- 受众面大
hive on shark ==> hive on spark
SQL on Hadoop
Spark SQL的愿景
- write less code
- read less data
- let the optimizer do the hard work
2.快速入门
2.1 SQLContext
Spark SQL中所有功能的入口点是SQLContext类或其子类之一
要创建一个基本的SQLContext,您只需要一个SparkContext类对象
1 | val conf = new SparkConf() |
除了基本的SparkContext
之外,还可以创建HiveContext
,它提供了基本SparkContext
提供的功能的超集。其他功能包括使用更完整的HiveQL
解析器编写查询的能力,对Hive UDF
的访问,以及从Hive表中读取数据的能力
要使用HiveContext
,不需要进行额外的配置,并且所有可用于SQLContext的数据源仍然可用
对于SQLContext
,唯一可用的方言是“SQL”,它使用Spark SQL提供的简单SQL解析器。在HiveContext
中,默认值是“HiveQL”,尽管“sql”也可用。因为HiveQL解析器更完整,所以建议在大多数用例中使用它
2.2 DataFrames
DataFrame
是一个分布式数据集,且内部数据是由多个命名列(列名、列类型、列值)组合而成
DataFrame
可以看做关系型数据库中的表或R/Python中的DataFrame
且对DataFrame
的执行做了更多的优化
使用SQLContext
,应用程序可以从现有的RDD、Hive表或数据源创建DataFrame
代码演示说明
1 | val conf = new SparkConf() |
DataFrame操作
1 | val conf = new SparkConf() |
2.3 DataFrame vs RDD
DataFrame | RDD | |
---|---|---|
分布式 | 是 | 是 |
易用性 | 高级API,更易用 | 高阶函数 |
通用性 | 对R、Python使用者无缝集成 | 需要Scala编程基础 |
数据结构 | 结构化 | 非结构化 |
2.4 SQL查询
SQLContext
上的SQL函数使应用程序能够以编程方式运行SQL查询,并以DataFrame
的形式返回结果
1 | val sqlContext = ... // An existing SQLContext |
2.5 Datasets
Datasets与RDDS类似,但并不使用Java序列化或Kryo,而是使用专门的编码器来序列化对象并通过网络进行处理或传输
1 | val conf = new SparkConf() |
3.DataFrame API
3.1 Actions
collect
返回DataFrame
中的所有Row
数组
1 | val conf = new SparkConf() |
collectAsList
返回包含Row
的Java列表
1 | val conf = new SparkConf() |
count
返回DataFrame
中Row
的总行数
1 | val conf = new SparkConf() |
describe(cols: String*)
计算数值列的统计信息,包括总数、均值、标准差、最小值和最大值。如果未给定任何列,则此函数计算所有数值列的统计信息
1 | val conf = new SparkConf() |
first
返回DataFrame
中的第一行,等价head()
1 | val conf = new SparkConf() |
head
1 | val conf = new SparkConf() |
show
以表格的形式显示DataFrame
中的数据
第一参数:显示的行数
第二个参数:是否截断长字符串
1 | val conf = new SparkConf() |
take
1 | val conf = new SparkConf() |
takeAsList
1 | val conf = new SparkConf() |
3.2 DataFrame基本功能
cache
对DataFrame
持久化,默认持久化策略(MEMORY_AND_DISK)
1 | df.cache |
columns
以数组的形式返回所有列名
1 | df.columns |
dtypes
以数组的方式返回所有列名及列类型
1 | df.dtypes |
explain
将计划(逻辑和物理)打印到控制台以进行调试
1 | println(df.explain(true)) |
isLocal
如果collect和take方法可以在本地运行(没有任何spark执行器),则返回true
1 | println(df.isLocal) |
persist(newLevel: StorageLevel)
使用给定的StorageLevel对DataFrame
进行持久化
1 | df.persist(StorageLevel.MEMORY_ONLY) |
printSchema
以友好的树形结构将schema打印到控制台
1 | df.printSchema |
schema
返回DataFrame
的schema
1 | df.schema |
toDF(colNames: String*)
1 | val conf = new SparkConf() |
unpersist(blocking: Boolean)
1 | df.unpersist(true) |
3.3 查询功能
select(cols: Column)
查询指定的列数据
1 | df.select("colA", "colB") |
where(condition: Column)
条件过滤
1 | df.select("name", "age").where($"name".equalTo("Justin") and $"age" > 10).show |
distinct
去重
1 | df.distinct |
drop(col: Column)
返回一个删除了列的新DataFrame
1 | val newDF = df.drop("age") |
except(other: DataFrame)
返回一个新的DataFrame
,该DataFrame
为两个DataFrame
的差集
1 | df.except(newDF) |
explode
返回一个新的DataFrame
,其中一列已被提供的函数扩展为零行或多行
1 | df.explode("name", "newName"){name: String => name.split(" ")}.show |
filter(condition: Column)
1 | // The following are equivalent: |
groupBy(cols: Column*)
使用指定的列对DataFrame
进行分组,以便我们可以对它们运行聚合
1 | // Compute the average for all numeric columns grouped by department. |
intersect(other: DataFrame)
返回两个DataFrame
的交集
1 | df.intersect(newDF).show |
join(right: DataFrame, joinExprs: Column, joinType: String)
joinType:inner outer, left_outer, right_outer, leftsemi
使用给定的连接表达式连接两个DataFrame
1 | // Scala: |
limit(n: Int)
获取前n行并返回新的DataFrame
,与head()区别在于head()返回一个数组,limit()返回一个新的DataFrame
1 | df.limit(10).show |
sort(sortExprs: Column*)
根据给定的表达式返回一个排序后的DataFrame
,等价与orderBy()
1 | df.sort($"col1", $"col2".desc).show |
agg(expr: Column, exprs: Column*)
在不分组的情况下,对DataFrame
的数据进行聚合
1 | import org.apache.spark.sql.functions._ |
3.4 输出操作
write
将DataFrame
的内容保存到外部存储器
1 | val conf = new SparkConf() |
3.5 RDD操作
coalesce
返回具有numpartitions
分区的新DataFrame
,此操作会产生窄依赖。例如,如果从1000个分区转到100个分区,则不会出现shuffle
1 | df.coalesce(10) |
flatMap
返回一个新的RDD,flatMap中的func将作用于DataFrame
中的所有Row
1 | val result = df.flatMap(x => { |
foreach
应用foreach中的func到DataFrame
中的每一个Row
1 | df.foreach(println) |
foreachPartition
应用foreach中的func到DataFrame
中的每一个分区
1 | df.foreachPartition(it => { |
rdd
将DataFrame
的内容表示为Row
的RDD
1 | df.rdd |
4.DataFrame与RDD互转换
Spark SQL支持将现有RDD转换为DataFrame的两种不同方法
基于反射推断
基于反射的方法,在已知schema时,可以编写出更简洁的spark应用
基于编程接口构造schema
该接口允许你构造一个schema,然后将其应用到现有的RDD
4.1 基于反射推断
Spark SQL中的scala接口支持将包含case class的RDD自动转换为DataFrame
case class用于定义schema,使用反射读取case class中的参数名称,并反射成DataFrame的列
case class可以嵌套或包含复杂类型,如序列和数组
RDD可以隐式转换为DataFrame,然后注册成表,并可使用SQL语句使用访问操作
1 | //定义case class |
注:case class要定义在全局作用域
4.2 基于编程接口构造schema
如果不能提前定义case class,则可以通过三个步骤以编程方式创建DataFrame
- 从原始RDD创建基于
Row
的RDD - 使用
StructType
定义步骤1创建的RDD的schema - 使用SQLContext的
createDataFrame()
对RDD进行转换
1 | val conf = new SparkConf() |
5.数据源
Spark SQL支持通过DataFrame接口对各种数据源进行操作
DataFrame可以作为普通RDD操作,也可以注册为临时表
5.1 通用加载/保存功能
Spark中,默认加载数据源为parquet文件类型,除非手动指定spark.sql.sources.default
属性
1 | val df = sqlContext.read.load("examples/src/main/resources/users.parquet") |
手动指定配置
1 | val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") |
直接对文件运行SQL
1 | val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") |
存储模式
保存操作可以选择采用保存模式,该模式指定如何处理现有数据(如果存在)
重要的是要认识到这些保存模式不使用任何锁定,也不是原子的
在执行覆盖时,数据将在写入新数据之前被删除
保存模式 | 级别 | 描述 |
---|---|---|
SaveMode.ErrorIfExists(default) | “error”(default) | 将DataFrame 保存到数据源时,如果数据已存在,则应引发异常 |
SaveMode.Append | “append” | 将DataFrame 保存到数据源时,如果数据/表已经存在,则DataFrame 的内容将被追加到现有数据中 |
SaveMode.Overwrite | “overwrite” | 覆盖模式是指在将DataFrame 保存到数据源时,如果数据/表已经存在,则现有数据将被DataFrame 的内容覆盖 |
SaveMode.Ignore | “ignore” | 忽略模式是指在将DataFrame 保存到数据源时,如果数据已经存在,则保存操作不会保存DataFrame 的内容,也不会更改现有数据 |
代码演示说明
1 | val df = sqlContext.read.load("/Users/peidonggao/Desktop/users.parquet") |
5.2 Parquet文件
Parquet是需要其他数据处理系统支持的列式格式类型
Spark SQL支持读取和写入Parquet文件时,自动维护原始数据的schema
对Parquet文件进行写入时,考虑到兼容性,所有列都自动转换为可以为空
数据加载
1 | val conf = new SparkConf() |
分区推断
表分区是类似hive的系统中常用的优化方法
在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中
Parquet数据源能够能够自动发现和推断分区信息
创建分区
1 | hdfs dfs -mkdir -p /user/root/parquet/gender=male/country=CN |
上传数据到该目录
1 | hdfs dfs -put ~/users.parquet /user/root/parquet/gender=male/country=CN |
测试
1 | val conf = new SparkConf() |
Parquet合并元数据
与ProtocolBuffer、Avro和Thrift一样,Parquet也支持schema进化
用户可以从一个简单的schema开始,并根据需要逐步向该模式添加更多列
这样,用户最终可能会得到多个具有不同但相互兼容模式的Parquet文件
由于模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,所以我们默认从1.5.0开始关闭它
你可以通过
- 在读取Parquet文件时,将数据源选项mergeschema设置为true
- 将全局SQL选项
spark.sql.parquet.mergeschema
设置为true
1 | val conf = new SparkConf() |
5.3 JSON数据集源
Spark SQL可以自动推断JSON数据集的schema并将其作为DataFrame
加载
这种转换可以通过SQLContext.read.json()
加载字符串形式的RDD或JSON文件
请注意,作为JSON文件提供的文件不是典型的JSON文件
每一行必须包含一个独立的、自包含的有效JSON对象。因此,常规的多行JSON文件通常会失败
1 | val sc = new SparkContext(conf) |
复杂JSON格式处理
1 | val conf = new SparkConf() |
5.4 Hive数据源
Spark SQL也支持对Hive中的数据进行读写操作。但是,由于hive具有大量依赖项,因此它不包含在默认的Spark程序集中
配置HIve metaStore Service
1
2
3
4<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083</value>
</property>开启Hive metaStore Service
1
bin/hive --service metastore &
拷贝Hive conf/hive-site.xml到Spark conf目录下
拷贝mysql-connector-java-5.1.27-bin.jar到Spark lib目录下
编写脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33val conf = new SparkConf()
.setAppName("hive dataSource")
.setMaster("local")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
//创建表
hiveContext.sql("drop table if exists student_info")
hiveContext.sql("create table if not exists student_info(name string, age int) "
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '")
hiveContext.sql("drop table if exists student_score")
hiveContext.sql("create table if not exists student_score(name string, score int) "
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '")
//load student data
hiveContext.sql("LOAD DATA INPATH "
+ "'/user/root/data/student_info.txt' "
+ "INTO TABLE student_info")
hiveContext.sql("LOAD DATA INPATH "
+ "'/user/root/data/student_score.txt' "
+ "INTO TABLE student_score")
//join
val resultDF = hiveContext.sql("select si.name,si.age,ss.score from "
+ "student_info si join student_score ss "
+ "on si.name = ss.name ")
hiveContext.sql("drop table if exists t_students")
resultDF.write.saveAsTable("t_students")
resultDF.showspark-submit
1
2
3
4
5
6
7
8./bin/spark-submit \
--class SparkSQLHiveTest \
--master spark://master:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
--files /opt/modules/apache-hive-1.2.1-bin/conf/hive-site.xml \
--jars /opt/modules/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar,/opt/modules/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar,/opt/modules/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar,/opt/modules/spark-1.6.3-bin-hadoop2.6/lib/mysql-connector-java-5.1.27-bin.jar \
/opt/modules/spark-1.6.3-bin-hadoop2.6/scala/spark-demo4.jar
5.5 JDBC数据源
1 | val content = sqlContext.read |
6.性能优化
6.1 数据缓存
Spark SQL可以通过调用sqlContext.cacheTable("tableName")
或dataframe.cache()
将列格式的表缓存到内存中
之后,Spark SQL只需扫描所需的列,并自动调整压缩,以最小化内容使用和降低GC压力
此外,可以通过调用sqlContext.uncachetable(“tablename”)
从内存中删除表
内存缓存的配置可以使用sqlContext
的setconf
方法完成
属性名 | 默认值 | 描述 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 当设置为true,Spark SQL将根据数据统计信息,为每列自动选择一种压缩编码器 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 缓存批处理大小, 较大的批处理可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险 |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 配置执行broadcast join的最大字节表的阈值,小于该阈值的表在执行join操作时会使用broadcast join来优化原有的join操作 |
spark.sql.tungsten.enabled | true | 当设置为true,则使用优化后的钨丝物理执行计划,底层会显式管理内存并动态生成字节码 |
spark.sql.shuffle.partitions | 200 | 配置在进行join或aggregation操作时的分区数 |