1.概述

Spark SQL是用于结构化数据处理的Spark模块与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与Spark SQL交互,包括SQL、DataFrame API和DataSets API。当计算结果时,使用相同的执行引擎,而与您用来表示计算的API/语言无关。这种统一意味着开发人员可以很容易地在各种API之间来回切换,基于这些API提供了表示给定转换的最自然的方式

为什么需要SQL

  • 事实上的标准
  • 易学易用
  • 受众面大

hive on shark ==> hive on spark

SQL on Hadoop

Spark SQL的愿景

  • write less code
  • read less data
  • let the optimizer do the hard work

2.快速入门

2.1 SQLContext

Spark SQL中所有功能的入口点是SQLContext类或其子类之一

要创建一个基本的SQLContext,您只需要一个SparkContext类对象

1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
println(sqlContext)

除了基本的SparkContext之外,还可以创建HiveContext,它提供了基本SparkContext提供的功能的超集。其他功能包括使用更完整的HiveQL解析器编写查询的能力,对Hive UDF的访问,以及从Hive表中读取数据的能力

要使用HiveContext,不需要进行额外的配置,并且所有可用于SQLContext的数据源仍然可用

对于SQLContext,唯一可用的方言是“SQL”,它使用Spark SQL提供的简单SQL解析器。在HiveContext中,默认值是“HiveQL”,尽管“sql”也可用。因为HiveQL解析器更完整,所以建议在大多数用例中使用它

2.2 DataFrames

DataFrame是一个分布式数据集,且内部数据是由多个命名列(列名、列类型、列值)组合而成

DataFrame可以看做关系型数据库中的表或R/Python中的DataFrame

且对DataFrame的执行做了更多的优化

使用SQLContext,应用程序可以从现有的RDD、Hive表或数据源创建DataFrame

代码演示说明

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
df.show

DataFrame操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")

//显示DataFrame中的内容
df.show

//以树形结构打印DataFrame数据结构
df.printSchema

//查询指定列
df.select("name").show

//列运算
df.select(df("name"), df("age") + 1).show

//条件过滤
df.filter(df("age") > 21).show

//分组统计
df.groupBy("age").count().show

2.3 DataFrame vs RDD

DataFrame RDD
分布式
易用性 高级API,更易用 高阶函数
通用性 对R、Python使用者无缝集成 需要Scala编程基础
数据结构 结构化 非结构化

2.4 SQL查询

SQLContext上的SQL函数使应用程序能够以编程方式运行SQL查询,并以DataFrame的形式返回结果

1
2
val sqlContext = ... // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")

2.5 Datasets

Datasets与RDDS类似,但并不使用Java序列化或Kryo,而是使用专门的编码器来序列化对象并通过网络进行处理或传输

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect().foreach(println)

3.DataFrame API

3.1 Actions

collect

返回DataFrame中的所有Row数组

1
2
3
4
5
6
7
8
9
10
11
12
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
val rows = df.collect()
rows.foreach(println)

collectAsList

返回包含Row的Java列表

1
2
3
4
5
6
7
8
9
10
11
12
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
val rows = df.collectAsList()
println(rows)

count

返回DataFrameRow的总行数

1
2
3
4
5
6
7
8
9
10
11
12
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
val count = df.count
println("count: " + count)

describe(cols: String*)

计算数值列的统计信息,包括总数均值标准差最小值最大值。如果未给定任何列,则此函数计算所有数值列的统计信息

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
df.describe("age").show

first

返回DataFrame中的第一行,等价head()

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
println(df.first)

head

1
2
3
4
5
6
7
8
9
10
11
12
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
println(df.head)
println(df.head(10))

show

以表格的形式显示DataFrame中的数据

第一参数:显示的行数

第二个参数:是否截断长字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
df.show
df.show(10)
df.show(10, true)

take

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
df.take(10).foreach(println)

takeAsList

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.json("/Users/peidonggao/Desktop/people.json")
df.takeAsList(10)

3.2 DataFrame基本功能

cache

DataFrame持久化,默认持久化策略(MEMORY_AND_DISK)

1
df.cache

columns

以数组的形式返回所有列名

1
df.columns

dtypes

以数组的方式返回所有列名及列类型

1
df.dtypes

explain

将计划(逻辑和物理)打印到控制台以进行调试

1
println(df.explain(true))

isLocal

如果collect和take方法可以在本地运行(没有任何spark执行器),则返回true

1
println(df.isLocal)

persist(newLevel: StorageLevel)

使用给定的StorageLevel对DataFrame进行持久化

1
df.persist(StorageLevel.MEMORY_ONLY)

printSchema

以友好的树形结构将schema打印到控制台

1
df.printSchema

schema

返回DataFrame的schema

1
df.schema

toDF(colNames: String*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val textFile = sc.textFile("/Users/peidonggao/Desktop/users.txt")
val users = textFile.map(x => {
val user = x.split(" ")
(user(0), user(1), user(2))
})

val df = users.toDF("id","name","age")
df.show

unpersist(blocking: Boolean)

1
df.unpersist(true)

3.3 查询功能

select(cols: Column)

查询指定的列数据

1
2
3
df.select("colA", "colB")
df.select($"colA", $"colB")
df.select($"colA", $"colB" + 1)

where(condition: Column)

条件过滤

1
df.select("name", "age").where($"name".equalTo("Justin") and $"age" > 10).show

distinct

去重

1
df.distinct

drop(col: Column)

返回一个删除了列的新DataFrame

1
val newDF = df.drop("age")

except(other: DataFrame)

返回一个新的DataFrame,该DataFrame为两个DataFrame的差集

1
df.except(newDF)

explode

返回一个新的DataFrame,其中一列已被提供的函数扩展为零行或多行

1
df.explode("name", "newName"){name: String => name.split(" ")}.show

filter(condition: Column)

1
2
3
// The following are equivalent:
peopleDf.filter($"age" > 15)
peopleDf.where($"age" > 15)

groupBy(cols: Column*)

使用指定的列对DataFrame进行分组,以便我们可以对它们运行聚合

1
2
// Compute the average for all numeric columns grouped by department.
df.groupBy("department").avg()

intersect(other: DataFrame)

返回两个DataFrame的交集

1
df.intersect(newDF).show

join(right: DataFrame, joinExprs: Column, joinType: String)

joinType:inner outer, left_outer, right_outer, leftsemi

使用给定的连接表达式连接两个DataFrame

1
2
3
4
5
6
7
// Scala:
import org.apache.spark.sql.functions._
df1.join(df2, $"df1Key" === $"df2Key", "outer")

// Java:
import static org.apache.spark.sql.functions.*;
df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer")

limit(n: Int)

获取前n行并返回新的DataFrame,与head()区别在于head()返回一个数组,limit()返回一个新的DataFrame

1
df.limit(10).show

sort(sortExprs: Column*)

根据给定的表达式返回一个排序后的DataFrame,等价与orderBy()

1
df.sort($"col1", $"col2".desc).show

agg(expr: Column, exprs: Column*)

在不分组的情况下,对DataFrame的数据进行聚合

1
2
import org.apache.spark.sql.functions._
df.agg(max($"age"),min($"age"),avg($"age")).show

3.4 输出操作

write

DataFrame的内容保存到外部存储器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val textFile = sc.textFile("/Users/peidonggao/Desktop/users.txt")
val users = textFile.map(x => {
val user = x.split(" ")
(user(0), user(1), user(2))
})

val df = users.toDF("id","name","age")
val writer = df.write
writer.save("/Users/peidonggao/Desktop")

3.5 RDD操作

coalesce

返回具有numpartitions分区的新DataFrame,此操作会产生窄依赖。例如,如果从1000个分区转到100个分区,则不会出现shuffle

1
df.coalesce(10)

flatMap

返回一个新的RDD,flatMap中的func将作用于DataFrame中的所有Row

1
2
3
4
5
val result = df.flatMap(x => {
List(x.get(0),x.get(1),x.get(2))
})

result.foreach(println)

foreach

应用foreach中的func到DataFrame中的每一个Row

1
df.foreach(println)

foreachPartition

应用foreach中的func到DataFrame中的每一个分区

1
2
3
4
5
df.foreachPartition(it => {
while(it.hasNext) {
println("row: " + it.next().toString)
}
})

rdd

DataFrame的内容表示为Row的RDD

1
df.rdd

4.DataFrame与RDD互转换

Spark SQL支持将现有RDD转换为DataFrame的两种不同方法

  • 基于反射推断

    基于反射的方法,在已知schema时,可以编写出更简洁的spark应用

  • 基于编程接口构造schema

    该接口允许你构造一个schema,然后将其应用到现有的RDD

4.1 基于反射推断

Spark SQL中的scala接口支持将包含case class的RDD自动转换为DataFrame

case class用于定义schema,使用反射读取case class中的参数名称,并反射成DataFrame的列

case class可以嵌套或包含复杂类型,如序列和数组

RDD可以隐式转换为DataFrame,然后注册成表,并可使用SQL语句使用访问操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//定义case class
case class Person(name: String, age: Int)

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

//创建Person对象的RDD
val peopleRDD = sc.textFile("/Users/peidonggao/Desktop/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
//RDD转换为DataFrame
val df = peopleRDD.toDF()
//DataFrame注册为表
df.registerTempTable("people")
//使用SQLContext提供的SQL方法来运行SQL语句
val peoples = sqlContext.sql("select name, age from people WHERE age >= 13 AND age <= 19")
peoples.show
}

注:case class要定义在全局作用域

4.2 基于编程接口构造schema

如果不能提前定义case class,则可以通过三个步骤以编程方式创建DataFrame

  1. 从原始RDD创建基于Row的RDD
  2. 使用StructType定义步骤1创建的RDD的schema
  3. 使用SQLContext的createDataFrame()对RDD进行转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

//创建RDD
val peopleRDD = sc.textFile("/Users/peidonggao/Desktop/people.txt")
//基于字符串编码的schema
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType}
//根据字符串编码的schema创建数据结构
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 将RDD数据转换为Row
val rowRDD = peopleRDD.map(_.split(",")).map(p => Row(p(0), p(1).trim))
//将schema应用于RDD
val df = sqlContext.createDataFrame(rowRDD, schema)
//DataFrame注册为表
df.registerTempTable("people")
//使用SQLContext提供的SQL方法来运行SQL语句
val results = sqlContext.sql("select name from people")
results.show

5.数据源

Spark SQL支持通过DataFrame接口对各种数据源进行操作

DataFrame可以作为普通RDD操作,也可以注册为临时表

5.1 通用加载/保存功能

Spark中,默认加载数据源为parquet文件类型,除非手动指定spark.sql.sources.default属性

1
2
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手动指定配置

1
2
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

直接对文件运行SQL

1
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

存储模式

保存操作可以选择采用保存模式,该模式指定如何处理现有数据(如果存在)

重要的是要认识到这些保存模式不使用任何锁定,也不是原子的

在执行覆盖时,数据将在写入新数据之前被删除

保存模式 级别 描述
SaveMode.ErrorIfExists(default) “error”(default) DataFrame保存到数据源时,如果数据已存在,则应引发异常
SaveMode.Append “append” DataFrame保存到数据源时,如果数据/表已经存在,则DataFrame的内容将被追加到现有数据中
SaveMode.Overwrite “overwrite” 覆盖模式是指在将DataFrame保存到数据源时,如果数据/表已经存在,则现有数据将被DataFrame的内容覆盖
SaveMode.Ignore “ignore” 忽略模式是指在将DataFrame保存到数据源时,如果数据已经存在,则保存操作不会保存DataFrame的内容,也不会更改现有数据

代码演示说明

1
2
val df = sqlContext.read.load("/Users/peidonggao/Desktop/users.parquet")
df.write.format("json").mode(SaveMode.Append).save("/Users/peidonggao/Desktop/json")

5.2 Parquet文件

Parquet是需要其他数据处理系统支持的列式格式类型

Spark SQL支持读取和写入Parquet文件时,自动维护原始数据的schema

对Parquet文件进行写入时,考虑到兼容性,所有列都自动转换为可以为空

数据加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val parquetFile = sqlContext.read.parquet("/Users/peidonggao/Desktop/users.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile")
teenagers.show

分区推断

表分区是类似hive的系统中常用的优化方法

在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中

Parquet数据源能够能够自动发现和推断分区信息

创建分区

1
hdfs dfs -mkdir -p /user/root/parquet/gender=male/country=CN

上传数据到该目录

1
hdfs dfs -put ~/users.parquet /user/root/parquet/gender=male/country=CN

测试

1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df = sqlContext.read.parquet("hdfs://192.168.198.128:9000/user/root/parquet/gender=male/country=CN/users.parquet")
df.show()

Parquet合并元数据

与ProtocolBuffer、Avro和Thrift一样,Parquet也支持schema进化

用户可以从一个简单的schema开始,并根据需要逐步向该模式添加更多列

这样,用户最终可能会得到多个具有不同但相互兼容模式的Parquet文件

由于模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,所以我们默认从1.5.0开始关闭它

你可以通过

  1. 在读取Parquet文件时,将数据源选项mergeschema设置为true
  2. 将全局SQL选项spark.sql.parquet.mergeschema设置为true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("hdfs://192.168.198.128:9000/user/root/data/test_table/key=1")

val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("hdfs://192.168.198.128:9000/user/root/data/test_table/key=2")

val df3 = sqlContext.read.option("mergeSchema", "true").parquet("hdfs://192.168.198.128:9000/user/root/data/test_table")
df3.printSchema()

5.3 JSON数据集源

Spark SQL可以自动推断JSON数据集的schema并将其作为DataFrame加载

这种转换可以通过SQLContext.read.json()加载字符串形式的RDD或JSON文件

请注意,作为JSON文件提供的文件不是典型的JSON文件

每一行必须包含一个独立的、自包含的有效JSON对象。因此,常规的多行JSON文件通常会失败

1
2
3
4
5
6
7
8
9
10
11
12
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val stuDF = sqlContext.read.json("C:\\Users\\MAX\\Desktop\\students.json")
val scoreDF = sqlContext.read.json("C:\\Users\\MAX\\Desktop\\scores.json")
//注册表
stuDF.registerTempTable("student")
scoreDF.registerTempTable("score")

val stu_scoreDF = sqlContext.sql("select stu.name,sc.score from student stu join score sc on stu.id=sc.id")
stu_scoreDF.show

复杂JSON格式处理

1
2
3
4
5
6
7
8
9
10
11
12
val conf = new SparkConf()
.setAppName("spark sql")
.setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val bookDF = sqlContext.read.json("C:\\Users\\MAX\\Desktop\\book_info.json")
bookDF.registerTempTable("book")
val result = sqlContext.sql("select bookId,bookName,price,author.name,author.age,author.description from book")
result.printSchema

5.4 Hive数据源

Spark SQL也支持对Hive中的数据进行读写操作。但是,由于hive具有大量依赖项,因此它不包含在默认的Spark程序集中

  1. 配置HIve metaStore Service

    1
    2
    3
    4
    <property>
    <name>hive.metastore.uris</name>
    <value>thrift://master:9083</value>
    </property>
  2. 开启Hive metaStore Service

    1
    bin/hive --service metastore &
  3. 拷贝Hive conf/hive-site.xml到Spark conf目录下

  4. 拷贝mysql-connector-java-5.1.27-bin.jar到Spark lib目录下

  5. 编写脚本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    val conf = new SparkConf()
    .setAppName("hive dataSource")
    .setMaster("local")

    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)

    //创建表
    hiveContext.sql("drop table if exists student_info")
    hiveContext.sql("create table if not exists student_info(name string, age int) "
    + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '")

    hiveContext.sql("drop table if exists student_score")
    hiveContext.sql("create table if not exists student_score(name string, score int) "
    + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '")

    //load student data
    hiveContext.sql("LOAD DATA INPATH "
    + "'/user/root/data/student_info.txt' "
    + "INTO TABLE student_info")

    hiveContext.sql("LOAD DATA INPATH "
    + "'/user/root/data/student_score.txt' "
    + "INTO TABLE student_score")

    //join
    val resultDF = hiveContext.sql("select si.name,si.age,ss.score from "
    + "student_info si join student_score ss "
    + "on si.name = ss.name ")

    hiveContext.sql("drop table if exists t_students")
    resultDF.write.saveAsTable("t_students")
    resultDF.show

    spark-submit

    1
    2
    3
    4
    5
    6
    7
    8
    ./bin/spark-submit \
    --class SparkSQLHiveTest \
    --master spark://master:7077 \
    --executor-memory 2G \
    --total-executor-cores 2 \
    --files /opt/modules/apache-hive-1.2.1-bin/conf/hive-site.xml \
    --jars /opt/modules/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar,/opt/modules/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar,/opt/modules/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar,/opt/modules/spark-1.6.3-bin-hadoop2.6/lib/mysql-connector-java-5.1.27-bin.jar \
    /opt/modules/spark-1.6.3-bin-hadoop2.6/scala/spark-demo4.jar

5.5 JDBC数据源

1
2
3
4
5
6
7
8
9
10
11
val content = sqlContext.read
.format("jdbc")
.options(Map(
"driver" -> "com.mysql.jdbc.Driver",
"url" -> "jdbc:mysql://localhost:3306/demo",
"dbtable" -> "t_user",
"user" -> "root",
"password" -> "111111"
)).load

content.show

6.性能优化

6.1 数据缓存

Spark SQL可以通过调用sqlContext.cacheTable("tableName") dataframe.cache()将列格式的表缓存到内存中

之后,Spark SQL只需扫描所需的列,并自动调整压缩,以最小化内容使用和降低GC压力

此外,可以通过调用sqlContext.uncachetable(“tablename”)从内存中删除表

内存缓存的配置可以使用sqlContextsetconf方法完成

属性名 默认值 描述
spark.sql.inMemoryColumnarStorage.compressed true 当设置为true,Spark SQL将根据数据统计信息,为每列自动选择一种压缩编码器
spark.sql.inMemoryColumnarStorage.batchSize 10000 缓存批处理大小, 较大的批处理可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置执行broadcast join的最大字节表的阈值,小于该阈值的表在执行join操作时会使用broadcast join来优化原有的join操作
spark.sql.tungsten.enabled true 当设置为true,则使用优化后的钨丝物理执行计划,底层会显式管理内存并动态生成字节码
spark.sql.shuffle.partitions 200 配置在进行join或aggregation操作时的分区数