Spark
1.Spark概述
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法
Spark 是在Scala语言中实现的,它将Scala用作其应用程序框架。与Hadoop不同,Spark和Scala能够紧密集成,其中的Scala可以像操作本地集合对象一样轻松地操作分布式数据集
尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对Hadoop的补充,可以在Hadoop文件系统中并行运行。可用来构建大型的、低延迟的数据分析应用程序
spark的特点
更快的速度
内存计算下,Spark比Hadoop快100倍
易用性
支持Java、Python、Scala、R多种实现
Spark 提供了80多个高级运算符
通用性
Spark 提供了大量的库,包括Spark Core、Spark SQL、Spark Streaming、MLlib、GraphX。 开发者可以在同一个应用程序中无缝组合使用这些库
支持多种资源管理器
Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理器
Spark生态系统
Spark集成
2.Spark入门
2.1 Spark安装
预备环境
- jdk
- scala
- hadoop
部署模式
环境搭建
文件解压
1
tar zxf spark-1.6.3-bin-hadoop2.6.tgz –C /opt/modules
配置SPARK_HOME
1
2
3vi ~/.bash_profile
export SPARK_HOME=/opt/modules/spark-1.6.3-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATHspark环境配置
1
2
3
4
5
6vi spark-env.sh
JAVA_HOME=/opt/modules/jdk1.8.0_191
SCALA_HOME=/opt/modules/scala-2.10.7
SPARK_MASTER_IP=master
SPARK_WORKER_MEMORY=1g
HADOOP_CONF_DIR=/opt/modules/hadoop-2.6.5/etc/hadoopslave修改
1
2A Spark Worker will be started on each of the machines listed below.
localhost将spark拷贝到slave节点
1
scp –r /opt/modules/spark-1.6.3-bin-hadoop2.6 root@slave1:/opt/modules
启动
1
sbin/start-all.sh
访问spark webui
1
http://ip:8080
2.2 基础概念
1 | At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. |
翻译
1 | 从宏观的角度来看,每个spark应用程序都由一个驱动程序组成,该驱动程序运行用户的主要功能,并在集群上执行各种并行操作。Spark提供的主要抽象是一个弹性分布式数据集(RDD),它是跨集群节点分区的元素集合,可以并行操作。RDD是从Hadoop文件系统(或任何其他支持Hadoop的文件系统)中的文件或驱动程序中现有的scala集合开始创建的,并对其进行转换。用户还可能要求spark在内存中持久化RDD,从而允许在并行操作之间高效地重用RDD。最后,RDD可以自动从节点故障中恢复 |
2.3 快速入门
spark-shell实现wordCount
1
2val textFile = sc.textFile("README.md")
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)手动实现wordCount
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16object WorldCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("wordcount")
.setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile("hdfs://master:9000/user/root/README.md")
val wordsRDD = textFile.flatMap(_.split(" "))
val wordCountRDD = wordsRDD.map((_, 1))
wordCountRDD.collect.foreach(println)
sc.stop
}
}打包提交
spark-submit脚本
1
2
3
4
5./bin/spark-submit \
--class WordCount \
--executor-memory 2G \
--total-executor-cores 2 \
/opt/modules/spark-1.6.3-bin-hadoop2.6/scala/spark-wordcount.jar
3.Spark架构设计
Spark组件
1
2
3Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.翻译
1
2
3Spark应用程序在集群上作为独立的进程集合运行,由主程序(称为驱动程序)中的sparkContext对象与Spark集群进行交互协调
具体来说,要在集群上运行,可以通过SparkContext连接到几种类型的集群管理器(spark自己的独立集群管理器、meos或yarn),它们会为要应用程序分配资源。连接后,Spark会在集群中的节点上获取执行器,这些执行器是运行计算并为应用程序存储数据的进程。随后,会将应用程序代码发送给执行器。最后,SparkContext将任务发送给执行器以运行关于这个体系结构,有几个有用的东西需要注意
- 每个应用程序都有自己的执行器进程,这些进程在整个应用程序期间保持运行,并在多个线程中运行任务。这样做的好处是在调度端(每个驱动程序调度自己的任务)和执行端(来自不同应用程序的任务在不同的JVM中运行)将应用程序彼此隔离。但是,它还意味着,如果不将数据写入外部存储系统,就无法在不同的Spark应用程序(SparkContext实例)之间共享数据
- Spark对底层集群管理器不可知。要它能够获取执行器进程,并且这些进程彼此通信即可
- 驱动程序必须在其整个生命周期内监听并接收来自其执行器的传入连接
- 因为驱动程序在集群上调度任务,所以它应该在工作节点附近运行,最好在同一局域网上运行。如果您想远程向集群发送请求,最好打开驱动程序的RPC并让它从附近提交操作,而不是运行远离工作节点的驱动程序
集群相关术语说明
术语 描述 Driver program 运行应用程序的main函数并创建SparkContext的进程 Cluster manager 用于获取集群资源的外部服务 Master 进程,负责整个集群资源的调度、分配、监控等职责 Worker node 进程,负责存储RDD的某个或某些partition,启动其他进程或线程,对RDD的partition处理和计算 Executor 进程,运行任务,并将数据保存在内存或磁盘存储器中 Task 线程,对RDD的partition进行并行计算 Stage 每个作业被划分为更小的任务集,称为相互依赖的阶段(类似于map reduce中的map和reduce阶段)
4.RDD
1 | A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map, filter, and persist. In addition, org.apache.spark.rdd.PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join; org.apache.spark.rdd.DoubleRDDFunctions contains operations available only on RDDs of Doubles; and org.apache.spark.rdd.SequenceFileRDDFunctions contains operations available on RDDs that can be saved as SequenceFiles. All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit. |
翻译
1 | 弹性分布式数据集(RDD),Spark中的基本抽象。表示可以并行操作的元素的不可变分区集合。此类包含所有RDD上可用的基本操作,如map、filter和persist。此外,org.apache.spark.rdd.PairRDDFunctions包含仅在键值对的RDD上可用的操作,如GroupByKey和Join。org.apache.spark.rdd.DoubleRDDFunctions包含仅在Double的RDD上可用的操作。org.apache.spark.rdd.SequenceFileRDDFunctions包含对Sequencewen文件的操作。通过隐式转换,所有操作在任何正确类型的RDD上自动可用 |
4.1 RDD创建
Spark围绕弹性分布式数据集(RDD)的概念展开,RDD是一个可以并行操作的容错元素集合
创建RDD有两种方法
- 并行化已有集合
- 引用外部存储系统中的数据集
并行化已有集合
1 | val data = Array(1, 2, 3, 4, 5) |
并行集合的一个重要参数是要将数据集拆分后的分区数。spark将为集群的每个分区运行一个任务。通常,您需要为集群中的每个CPU分配2-4个分区。通常,spark会根据集群自动设置分区数。但是,您也可以通过将其作为第二个参数传递给parallelize
函数
外部数据集
1 | val distFile = sc.textFile("data.txt") |
使用spark读取文件的一些注意事项
如果在本地文件系统上使用路径,则该文件也必须可以在工作节点上的同一路径上访问
Spark所有基于文件的输入方法,包括textfile、支持在目录上运行、压缩文件和通配符。
例如,可以使用textfile(“/my/directory”)、textfile(“/my/directory/.txt”)和textfile(“/my/directory/.gz”)
textfile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,spark为文件的每个块创建一个分区(HDFS中的块默认为64MB),但您也可以通过传递较大的值来请求更多的分区。请注意,分区数不能少于块的数量
4.2 RDD操作
RDD支持两种类型的操作
transformations
从现有的数据集创建新的数据集
actions
在对数据集运行计算后将值返回给Driver program
Spark中所有的transformation操作都是懒惰的,因此它们并不会立即计算结果。相反,它们只是记住应用于某些基本数据集的转换。只有在执行action操作将结果返回到Driver program时,才会开始对transformation操作进行计算。这种设计使Spark能够更高效的运行
默认情况下,每次对每个已转换的RDD重新运行操作时,都会重新计算它。但是,您也可以使用persist(或cache)方法在内存中持久化RDD,在这种情况下,spark将保留集群中的元素,以便下次查询时更快地访问它。还支持在磁盘上持久化RDD,或跨多个节点复制RDD
考虑如下程序
1 | val lines = sc.textFile("data.txt") |
- 第一行定义了来自外部文件的基本RDD,此数据集未加载到内存中
- 第二行将内容的长度定义为RDD转换的结果,由于懒惰,不会立即计算行长度
- 最后,我们运行reduce,这是一个action操作。在这一点上,spark将计算分解为在不同的机器上运行的任务,并且每台机器同时运行其映射部分,并返回运行结果到driver program
如果你想再次使用lineLengths
,可将其进行持久化,避免重新计算
1 | lineLengths.persist() |
在进行reduce之前,这将导致在第一次计算之后将行lineLengths
保存到内存中
生命周期
Spark的难点之一是在集群中执行代码时了解变量和方法的范围和生命周期
代码如下
1 | var counter = 0 |
local vs cluster mode
上述代码可能无法达到预期的效果,为了执行任务,spark会将RDD的操作分解成任务,每个任务由对应的executor
来执行。在任务执行前,spark会计算任务任务的闭包,所谓闭包就是就是RDD在执行时需要使用到的变量与方法的集合,这些闭包内容会被序列化发送给每一个executor
发送给executor
的变量是driver program上的变量副本,当foreach函数引用counter时,它实际只用引用的是counter的变量副本,因此counter最终的值为0
为了确保在这些场景中定义良好的行为,应该使用累加器。Spark中的累加器专门用于提供一种机制,在集群中跨工作节点拆分执行时安全地更新变量
通常,闭包——类似于循环或本地定义的方法的构造,不应该用于改变某些全局状态
另一个常见的习惯用法是使用rdd.foreach(println)
或rdd.map(println)
打印出RDD的元素
在一台机器上,这将生成预期的输出并打印所有RDD元素。但在集群模式下,元素的打印会输出到executor
的stdout上,而不是driver program端的stdout
要打印驱动程序上的所有元素,可以使用collect()
方法先将RDD的数据带回到driver program,再进行打印
即rdd.collect().foreach(println)
但这将导致driver program内存耗尽,因为collect()
将整个RDD提取到一台机器上
通常建议使用rdd.take(100).foreach(println)
数据前几条数据
Transformations
转换 | 描述 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但每个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立的在RDD的每个分区上运行,因此在类型为T的RDD上运行时,func函数的类型必须是Iterator |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整形参数表示分片的索引值,因此在类型为T的RDD上运行时,func函数的类型必须(Int, Iterator |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后并返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后并返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后,返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)形式的RDD上调用,返回一个(K,Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)形式的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupBy类似,reduce任务的个数可以通过第二个参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 当调用(K,V)对的数据集时,返回(K,U)数据集,其中每个key的值使用给定的聚合函数和中性”零”值进行聚合。与groupbykey类似,reduce任务的数量可以通过可选的第二个参数进行配置 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)形式的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
join(otherDataset, [numTasks]) | 当调用(K,V)和(K,W)类型的数据集时,返回一个(K,(V,W))形式的数据集,支持leftouterjoin、rightouterjoin和fulloterjoin |
cogroup(otherDataset, [numTasks]) | 当调用(K,V)和(K,W)类型的数据集时,返回(K,(iterable |
cartesian(otherDataset) | 当调用T和U类型的数据集时,返回一个(T,U)类型的数据集 |
pipe(command, [envVars]) | 通过shell命令(例如perl或bash脚本)对RDD的每个分区进行管道连接。RDD元素写入进程的stdin,输出到其stdout的行作为字符串的RDD返回 |
coalesce(numPartitions) | 将RDD中的分区数减少到numPartitions。在过滤大型数据集后,可以更高效地运行操作 |
repartition(numPartitions) | 随机重组RDD中的数据,以创建更多或更少的分区,并在分区之间进行平衡,总是会产生shuffle操作 |
repartitionAndSortWithinPartitions(partitioner) | 根据给定的分区器对RDD重新分区,并在每个生成的分区内,按键对记录进行排序。这比调用重新分区然后在每个分区内进行排序更有效,因为它可以将排序向下推送到无序处理机器中 |
map
1 | val conf = new SparkConf() |
filter
1 | val conf = new SparkConf() |
flatMap
1 | val conf = new SparkConf() |
mapPartitions
1 | val conf = new SparkConf() |
mapPartitionsWithIndex
1 | val conf = new SparkConf() |
sample
1 | val conf = new SparkConf() |
groupByKey
1 | val conf = new SparkConf() |
reduceByKey
1 | val conf = new SparkConf() |
sortByKey
1 | val conf = new SparkConf() |
join
1 | val conf = new SparkConf() |
cogroup
1 | val conf = new SparkConf() |
union
1 | val conf = new SparkConf() |
intersection
1 | val conf = new SparkConf() |
distinct
1 | val conf = new SparkConf() |
cartesian
1 | val conf = new SparkConf() |
pipe
1 | val conf = new SparkConf() |
coalesce与repartition
1 | val conf = new SparkConf() |
aggregateByKey
1 | val conf = new SparkConf() |
Actions
动作 | 描述 |
---|---|
reduce(func) | 使用函数func聚合数据集的元素(函数func接受两个参数并返回一个参数)。函数应该是交换的和结合的,这样才能正确地并行计算 |
collect() | 在driver program中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似take(1)) |
take(n) | 返回一个由数据集前n个元素组成的数组 |
takeSample(withReplacement, num, [seed]) | 返回一个数组,该数组由从数据集中随机采样num个元素组成。可以选择是否采用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | 使用RDD的自然顺序或自定义比较器返回RDD的前n个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或其他文件系统 |
saveAsSequenceFile(path) (Java and Scala) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下 |
saveAsObjectFile(path) (Java and Scala) | 使用Java序列化将数据集的元素以一种简单的格式写入,然后可以使用 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,int)的map,表示每一个key对应元素的个数 |
foreach(func) | 对数据集的每个元素运行函数func |
4.3 Shuffle操作
1 | Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation |
翻译
1 | Spark中的某些操作会触发一个称为shuffle的事件。shuffle是spark重新分配数据的机制,因此它在分区之间的分组方式不同。这通常涉及到跨执行器和机器复制数据,使无序处理成为一个复杂且昂贵的操作 |
What happens when we do a groupBy
or a groupByKey
on a RDD? (Remember that our data is distributed on multiple nodes).
1 | val list = List((1, "one"),(2, "two"),(3, "three")) |
We typically have to move data from one node to another to be “grouped” with its “key”. Doing this is called Shuffling. We never call this shuffle method directly, but it happens behind to curtains for some other functions as above.
This might be very expensive for performance because of Latency!
翻译
1 | 通常,我们不得不将数据从一个节点移动到另一个节点,以便根据键进行"分组"。这种操作就叫做洗牌。我们并没有直接调用shuffle相关的方法,但它发生在行为幕后 |
Example
1 | // CFF是一家瑞士火车公司 |
Data Set
1 | val purchases = List( CFFPurchase(100, "Geneva", 22.25), |
目标:计算一个月内每个客户的旅行次数和花费金额
1 | val purchasesPerMonth = purchasesRdd |
How would this look on a cluster?
Lets say we have 3 nodes and our data is evenly distributed on it, so above operations look like this:
This shuffling is very expensive because of Latency.
Can we do a better job?
Perhaps we can reduce before we shuffle. This could greatly reduce the amount of data we send over network.
To do this we use reduceByKey
1 | val purchasesPerMonth = purchasesRdd.map( p => (p.customerId, (1, p.price)) ) // pair RDD |
What will this look like on the cluster?
Note: Here we shuffle considerable less amount of data, just by using reduceByKey
instead of doing a groupByKey
followed by map
Benefits of this approach:
- By reducing the dataset first, the amount of data sent over the network during the shuffle is greatly reduced. Thus performance gains are achieved!
4.4 宽依赖与窄依赖
Lineages
RDD也是一个DAG的任务集合,一个DAG代表了一个RDD的计算过程
每个DAG都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系
这种关系在Spark中叫做Lineages
Example
1 | val rdd = sc.textFile(...) |
RDD的依赖分为两种
- 窄依赖
- 宽依赖
窄依赖
父RDD的每个分区最多由子RDD的一个分区使用
1 | [parent RDD partition] ---> [child RDD partition] |
宽依赖
父RDD的每个分区被多个子RDD使用
1 | ---> [child RDD partition 1] |
- 通常具有窄依赖的算子
- map
- flatMap
- filter
- mapPartitions
- mapPartitionsWithIndex
- 通常具有宽依赖的算子
- cogroup
- join
- leftOuterJoin
- rightOuterJoin
- groupByKey
- reduceByKey
- distinct
- intersection
- repartition
- coalesce
4.5 RDD容错
Recomputing missing partitions is fast for narrow but slow for wide dependencies. So if above, a partition in G would have failed, it would have taken us more time to recompute that partition. So losing partitions that were derived from a transformation with wide dependencies, can be much slower.
5.RDD持久化
5.1 持久化策略
1 | One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use. |
翻译
1 | Spark中最重要的功能之一就是在内存中持久化(或缓存)数据集。当您持久化一个RDD时,每个节点将其计算的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重用它们。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具 |
持久化级别
持久化级别 | 描述 |
---|---|
MEMORY_ONLY | 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化,默认的持久化策略 |
MEMORY_AND_DISK | 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,不会立刻输出到磁盘 |
MEMORY_ONLY_SER | RDD的每个partition会被序列化成一个字节数组,节省空间,读取时间更占CPU |
MEMORY_AND_DISK_SER | 序列化存储,超出部分写入磁盘文件中 |
DISK_ONLY | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上 |
OFF_HEAP (experimental) | RDD序列化存储在Tachyon |
如何选择持久化策略
- 优先使用MEMORY_ONLY
- 内存放不下就使用MEMORY_ONLY_SER,但会额外消耗CPU资源
- 容错恢复使用MEMORY_ONLY_2
- 尽量不使用DISK相关策略
5.2 数据移除
1 | Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method. |
翻译
1 | spark会自动监视每个节点上的缓存使用情况,并通过LRU算法移除老的数据分区 |
6.共享变量
1 | Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators. |
翻译
1 | 通常,当传递给Spark操作(如map或reduce)的函数在远程集群节点上执行时,它在函数中使用的所有变量的driver program中变量的副本。这些变量被复制到每台机器上,对远程机器上变量的任何更新都不会传播回driver program。 |
6.1 广播变量
1 | Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. |
翻译
1 | 广播变量允许程序员在每台计算机上缓存只读变量,而不是将其副本与任务一起发送。 |
广播变量是通过调用sparkContext.broadcast(v)
从变量v创建的。广播变量是围绕v
的包装器,它的值可以通过调用value方法来访问
代码演示说明
1 | scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) |
创建广播变量后,应在集群上运行的任何函数中使用它,而不是使用值v
此外,对象v
在广播后不应进行修改,以确保所有节点都获得广播变量的相同值
6.2 累加器
累加器是在Spark计算操作中变量值累加起来,可以被用来实现计数器、或者求和操作
代码演示说明
1 | scala> val accum = sc.accumulator(0, "My Accumulator") |
7.二次排序
1 | def main(args: Array[String]): Unit = { |
1 | class SecondSortByKey(val first: Int, val second: Int) extends Ordered[SecondSortByKey] with Serializable { |