1.概述

Spark Streaming是Spark Core API的扩展,它支持对实时数据流进行可扩展、高吞吐量、容错的流失处理

数据可以从许多源(如kafka、flume或tcp socket)中提取,并可以使用复杂的算法(例如:map、reduce、join或window等高级函数)进行处理,最后将处理后的数据推送到文件系统、数据库或实时仪表盘中

实际上,可以通过数据流来使用Spark的机器学习或图处理算法

其底层运作原理是Spark Streaming接收实时数据的数据流,并将数据分批交由Spark engine进行处理,在接收并处理完数据的所有批次后,生成最终结果

Spark Streaming提供一种高级抽象数据结构,称为离散流,简称DStream,它表示连续的数据流

数据流可以从输入数据流(如Kafka、Flume和Kinesis)中创建,也可以通过对其他数据流应用高级操作来创建

其内部表示为RDD序列

2.快速入门

2.1 wordcount

预备环境:yum install -y nc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val conf = new SparkConf()
.setAppName("spark streaming")
.setMaster("local[2]")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
//创建DStream
val lines = ssc.socketTextStream("master", 9999)

val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print

ssc.start
ssc.awaitTermination

ssc.stop(true)

注:local至少为local[2],否则控制台没有输出

2.2 基本概念

StreamingContext

要初始化Spark Streaming程序,需要创建一个StreamingContext对象,该对象是Spark Streaming所有功能的主入口

Spark Streaming创建完成后,需要执行以下操作

  1. 定义输入源来创建DStream
  2. 对DStreams执行转换输出操作来进行流计算
  3. 使用streamingContext.start()来开启数据接收并处理
  4. 使用streamingContext.awaitTermination()等待停止处理
  5. 可通过streamingContext.stop()手动停止处理

注意项

  • Spark Streaming程序一旦启动,就不能设置或添加新的流计算
  • Spark Streaming程序一旦停止,就无法通过重启还原到停止前的状态
  • 一个JVM中只能同时有一个StreamingContext处于活动状态
  • StreamingContext.stop()也会停止SparkContext,若仅停止StreamingContext,则使用StreamingContext.stop(false)

DStream

DStream是Spark Streaming提供的基本抽象数据结构,它表示一个连续的数据流,要么是从其他数据源接收数据流,要么通过对已有的DStream进行转换生成新的DStream

其内部,数据流由一系列连续的RDD表示,这是Spark对不可变的分布式数据集的抽象。数据流中的每个RDD都包含来自特定间隔的数据

对DStream的任何操作,都会转换为对基础RDD的操作

例如之前的程序,将输入的每行内容,根据空格拆分为单个的字词

这些基本的RDD转换都是由Spark engine完成,在使用DStream时隐藏了大部分的RDD转换细节,并为开发人员提供了更加便利的高级API

输入与接收

每一个输入数据流都会对应一个接收器,接收器会将从数据源接收到的数据存储在Spark的内存中并进行处理

Spark Streaming提供了两类内置数据流输入源

  • 基本输入源

    直接在StreamingContext API中可用的输入源,例如文件系统、socket连接、akka actors

  • 高级输入源

    例如Kafka、Flume、Kinesis、Twitter等通过额外程序类来提供数据流输入

文件系统

1
2
3
4
5
6
7
8
9
10
11
12
val conf = new SparkConf()
.setAppName("spark streaming")
.setMaster("local[2]")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))

val file = ssc.textFileStream("hdfs://master:9000/user/root/dataDirectory")
file.print

ssc.start
ssc.awaitTermination

2.3 DStreams转换

与RDD类似,转换允许修改来自输入数据流的数据。DStream支持普通spark RDD上的许多可用转换

Transformation 描述
map(func) 通过将DStream中的每个元素通过函数func返回一个新的DStream
flatMap(func) 与map类似,但每个输入项都可以映射到0个或多个输出项
filter(func) 通过只选择func返回true的DStream的数据,返回新的DStream
repartition(numPartitions) 增加或减少DStream中的分区数,从而改变DStream的并行度
union(otherStream) DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream
count() 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream
countByValue() 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
updateStateByKey(func) 根据key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream
transform(func) 通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD

updateStateByKey操作

updateStateByKey操作允许对DStream中的数据状态进行持续更新,并维护最新的状态

通过如下两步实现

  • 定义状态-状态可以是任意数据类型
  • 定义状态更新函数-使用函数指定如何使用以前的状态和输入流中的新值更新状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val conf = new SparkConf()
.setAppName("network wordCount")
.setMaster("local[2]")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))
//使用updateStateByKey,必须开启checkpoint
ssc.checkpoint("hdfs://master:9000/user/root/checkpoint")

val line = ssc.socketTextStream("master", 9999)
val words = line.flatMap(_.split(" "))
val pairs = words.map((_, 1)).updateStateByKey(updateFunction _)
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print

ssc.start()
ssc.awaitTermination()
}

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}

对于每一批数据,spark都将对所有现有的key应用状态更新功能,不管它们是否在批中有新的数据。如果update函数返回none,那么KV键值对对将被消除

注意,使用updateStateByKey,必须对checkpoint进行配置

transform操作

tansform操作,允许在DStream上执行任意RDD到RDD的函数

它可用于应用DStream API中未公开的任何RDD操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val conf = new SparkConf()
.setAppName("network wordCount")
.setMaster("local[2]")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))

val blackList = ssc.sparkContext.parallelize(Array(("mike", true), ("bob", true)))
val line = ssc.socketTextStream("master", 9999)

val users = line.map(x => {
val field = x.split(" ")
val name = field(0)
val date = field(1)

(name, date)
})

val whiteUsers = users.transform(user => {
user.leftOuterJoin(blackList).filter(t => {
if(t._2._2.isEmpty) true else false
})
})

whiteUsers.print

ssc.start()
ssc.awaitTermination()

2.4 Window操作

Spark Streaming还提供窗口计算,允许您在滑动数据窗口上指定transform

如图所示,每次窗口在源DStream上滑动时,窗口中的源RDD都被组合并操作,以生成窗口DStream的RDD

在这种特定情况下,该操作将应用于过去3个时间单位的数据,并按2个时间单位滑动

这表明任何窗口操作都需要指定两个参数

  • 窗口长度-窗口的持续时间
  • 滑动间隔-执行窗口操作的间隔

这两个参数必须是源DStream批处理间隔的倍数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val conf = new SparkConf()
.setAppName("network wordCount")
.setMaster("local[2]")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))

val line = ssc.socketTextStream("master", 9999)
val words = line.flatMap(_.split(" "))
val wordCount = words.map((_, 1))
val result = wordCount.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(6))
result.print

ssc.start()
ssc.awaitTermination()
Transformation 描述
window(windowLength, slideInterval) 由一个DStream对象调用,传入一个窗口长度参数,一个窗口滑动间隔参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream
countByWindow(windowLength, slideInterval) 返回指定长度窗口中的元素个数
reduceByWindow(func, windowLength, slideInterval) 类似于reduce操作,只不过这里不再是对整个调用DStream进行reduce操作,而是在调用DStream上首先取窗口函数的元素形成新的DStream,然后在窗口元素形成的DStream上进行reduce
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 调用该操作的DStream中的元素格式为(k, v),整个操作类似于reduceByKey,只不过对应的数据源不同,reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的所有数据。该操作也有一个可选的并发数参数
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 这个窗口操作和上一个的区别是多传入一个函数invFunc。前面的func作用和上一个reduceByKeyAndWindow相同,后面的invFunc是用于处理流出rdd的
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 类似于前面的countByValue操作,调用该操作的DStream数据格式为(K, v),返回的DStream格式为(K, Long)。统计当前时间窗口中元素值相同的元素的个数

2.5 Output操作

Output Operation 描述
print() 打印DStream中每批数据的前十个元素,这对于开发和调试很有用
saveAsTextFiles(prefix, [suffix]) 将此DStream的内容保存为文本文件,每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix-time_in_ms[.suffix]”
saveAsObjectFiles(prefix, [suffix]) 将此DStream的内容保存为java对象序列化后的SequenceFiles,每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix-time_in_ms[.suffix]”
saveAsHadoopFiles(prefix, [suffix]) 将此DStream的内容保存为hadoop文件,每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix-time_in_ms[.suffix]”
foreachRDD(func) DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统

巧妙使用foreachRDD

DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统

了解如何正确有效地使用这个原语是很重要的,要避免一些常见的错误

错误代码样例

1
2
3
4
5
6
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}

正确代码样例

1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}

代码优化

1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}

最终优化

1
2
3
4
5
6
7
8
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}

3.缓存/持久化

与RDD类似,DStream还允许开发人员将流的数据持久保存在内存中

也就是说,在DStream上使用persist()方法将自动在内存中保留该DStream的每个RDD。如果DStream中的数据将被多次计算(例如,对同一数据进行多个操作),这将非常有用。对于基于窗口的操作(如ReduceByWindow和ReduceByKeyAndWindow)和基于状态的操作(如UpdateStateByKey),会进行隐式持久化

因此,由基于窗口的操作生成的数据流自动保存在内存中,而不需要开发人员调用persist()

4.检查点

流式应用程序必须全天候运行,因此必须能够适应与应用程序逻辑无关的故障(例如,系统故障、JVM崩溃等)。为了实现这一点,Spark流需要将足够的信息检查到容错存储系统,以便它能够从故障中恢复

检查点有两种类型的数据

  • 元数据检查点

    将定义流计算的信息保存到容错存储中,如hdfs。这用于从运行流应用程序驱动程序的节点的故障中恢复

  • 数据检查点

    将生成的RDD保存到可靠的存储。在一些跨多个批处理组合数据的状态转换中,这是必需的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
val checkpointDirectory = "hdfs://master:9000/user/root/checkpoint"

def main(args: Array[String]): Unit = {
val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

ssc.start()
ssc.awaitTermination()
}

def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf()
.setAppName("checkpoint")
.setMaster("local[2]")

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))
val line = ssc.socketTextStream("master", 9999)
val words = line.flatMap(_.split(" "))
val wordCount = words.map((_, 1)).updateStateByKey(updateFunction _)
val result = wordCount.reduceByKey(_ + _)

result.print
ssc.checkpoint(checkpointDirectory)
ssc
}

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}

5.Kafka集成

  • 基于接收器的方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    val conf = new SparkConf()
    .setAppName("spark kafka")
    .setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(5))
    val kafkaStream = KafkaUtils.createStream(ssc, "master:2181",
    "console-consumer-70996", Map("test" -> 1))
    kafkaStream.map(_._2).flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
    .print

    ssc.start()
    ssc.awaitTermination()
  • 直接方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    val conf = new SparkConf()
    .setAppName("spark kafka")
    .setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(5))
    val kafkaParams = Map[String, String](
    "metadata.broker.list" -> "master:9092"
    )
    val topicSet = Set("test")
    val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicSet
    )

    directKafkaStream.map(_._2).flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
    .print

    ssc.start()
    ssc.awaitTermination()

6.性能优化

集群环境下的Spark Streaming,想要获得最佳的性能则需要进行一些参数和配置上的调整

从更高的角度来看,你需要考虑两件事

  • 通过有效地使用集群资源来减少每批数据的处理时间
  • 设置正确的批大小,以便在接收到批数据时可以尽快处理这些批数据

6.1 减少批量处理时间

数据接收的并行级别

通过网络接收数据(如kafka、flume、socket等)需要将数据反序列化并存储在spark中

如果数据接收成为系统的瓶颈,那么考虑并行数据接收

请注意,每个输入数据流都创建一个接收单个数据流的接收器(运行在worker节点上)

因此,可以通过创建多个输入数据流并将其配置为从源接收数据流的不同分区来实现接收多个数据流

例如,接收两个数据主题的单个Kafka输入数据流可以分成两个Kafka输入流,每个输入流只接收一个主题

这将运行两个接收器,允许并行接收数据,从而增加总吞吐量

另一个应该考虑的参数是接收器的阻塞间隔,它由配置参数spark.streaming.blockinterval确定

对于大多数接收器,接收到的数据在存储在Spark的内存中之前被合并成数据块

每个批中的块数决定了用于处理接收数据的任务数

每批接收程序的任务数大约为(批间隔/块间隔)。例如,200毫秒的块间隔将每2秒批创建10个任务。如果任务数量太少(即,少于每台计算机的核心数量),那么它将效率低下,因为所有可用的核心都不会用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是,建议的块间隔最小值约为50 ms,低于该值,任务启动开销可能是一个问题

6.2 数据处理中的并行级别

如果在计算的任何阶段使用的并行任务数量不够多,则可能会导致集群资源的利用不足

例如,对于分布式reduce操作(如reduceByKey和reduceByKeyAndWindow),默认的并行任务数由spark.default.parallelism配置属性控制。您可以将并行度级别作为参数传递或者设置spark.default.parallelism配置属性来更改默认值

6.3 数据序列化

通过调整序列化格式可以降低数据序列化的开销

在流的情况下,有两种类型的数据会被序列化

  • 输入数据

    默认情况下,接收器接收到的数据是以MEMORY_AND_DISK_SER_2的持久化方式存储在executor的内存中

    也就是说,数据被序列化为字节是为了减少GC开销,并被复制以容忍执行器失败

    此外,数据首先保存在内存中,只有当内存不足以容纳流计算所需的所有输入数据时,数据才会溢出到磁盘上。这种序列化显然有一些开销——接收者必须对接收到的数据进行反序列化,并使用Spark的序列化格式对其进行重新序列化

  • 流操作生成的持久化RDD

    由流计算生成的RDD可以保存在内存中。例如,窗口操作将数据保存在内存中,因为它们将被多次处理。但是,与storagelevel.memory_only的spark核心默认值不同,流计算生成的持久化RDD默认与storagelevel.memory_only_ser(即序列化)一起持久化,以最小化GC开销

    在这两种情况下,使用kryo序列化可以减少CPU和内存开销

6.4 内存优化

一般来说,由于通过接收器接收的数据存储的持久化级别为MEMORY_AND_DISK_SER_2,这可能会降低流应用程序的性能,因此建议根据流应用程序的要求提供足够的内存

另一方面,垃圾回收是内存调优的另一个点。对于需要低延迟的流应用程序,不希望有由JVM垃圾收集引起的长停顿

有几个参数可以帮助您调整内存使用和GC开销:

  • DStream持久化级别

    默认情况下,输入数据和RDD作为序列化字节进行持久化,这减少了内存使用和GC开销

    启用kryo序列化进一步减少了序列化的大小和内存使用

    通过压缩(参见spark configuration spark.rdd.compress),可以进一步减少内存使用量,代价是占用CPU时间

  • 清除旧数据

    默认情况下,由DStream转换生成的数据和RDD都会在被自动清除。Spark Streaming根据所使用的transform操作来决定何时清除数据

    例如,如果使用的是10分钟的窗口操作,那么火花流将保留大约最后10分钟的数据,并主动丢弃旧数据

    通过设置streamingcontext.remember,可以将数据保留更长的时间

  • CMS垃圾收集器

    强烈推荐使用并发标记清除算法的GC垃圾回收器,以保持GC相关暂停持续较低

    尽管已知并发GC会降低系统的整体处理吞吐量,但仍建议使用它来实现更一致的批处理时间

    请确保在spark-submit时配置了CMS GC(–driver-java-options)并且在executor上配置了spark.executor.extraJavaOptions

  • 其他

    • 使用Tachyon持久化RDD
    • 使用更多堆内存较小的executor