Spark 学习笔记02-共享变量

 

共享变量是一种可以在 Spark 任务中使用的特殊类型的变量。Spark 有两种类型的共享变量:累加器(accumulator)广播变量(broadcast variable)

累加器用来对信息进行聚合,广播变量用来高效分发较大的对象。

当任务需要很长时间进行配置,譬如需要创建数据库连接或者随机数生成器时,在多个数据元素间共享一次配置就会比较有效率。由于需要用到远程呼号查询数据库,所以会讨论如何基于分区进行操作以重用数据库连接的配置工作。

除了 Spark 直接支持的语言外,系统还可以调用用别的语言写出来的程序。本章会介绍如何使用与 Spark 语言无关的方法 pipe() 来与其他程序通过标准输入和标准输出进行交互。我们会使用 pipe() 方法来访问 R 语言的库,以计算业余电台操作者每次联系的距离。

最后,和操作键值对类似,Spark 也有专门用来操作数值数据的方法。下面通过用业余电台呼叫日志计算出来的距离移除异常值的示例,展示如何使用这些方法。

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

累加器

第一种共享变量,即累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。

累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。例如,假设我们在从文件中读取呼号列表对应的日志,同时也想知道输入文件中有多少空行(也许不希望在有效输入中看到很多这样的行)

val sc = new SparkContext(sparkConf)
val file = sc.textFile("my.log")

/* 创建 Accumulator[Int] 并初始化为 0 * /
val blankLines = sc.accumulator(0)

val callSings = file.flatMap(line => {
	if (line == "") {
		blankLines += 1
	}
	line.split(" ")
})

callSings.saveAsTextFile("output.txt")
println("Blank lines: "+ blankLines.value)

注意,只有在运行 saveAsTextFile() 行动操作后才能看到正确的计数,因为行动操作前的转化操作 flatMap() 是惰性的,所以作为计算副产品的累加器只有在惰性的转化操作 flatMap()saveAsTextFile() 行动操作强制触发时才会开始求值。

当然,也可以使用 reduce() 这样的行动操作将整个 RDD 中的值都聚合到驱动器中。只是我们有时希望使用一种更简单的方法来对那些与 RDD 本身的范围和粒度不一样的值进行聚合。聚合可以发生在 RDD 进行转化操作的过程中。在前面的例子中,我们使用累加器在读取数据时对错误进行计数,而没有分别使用 filter()reduce()

总结起来,累加器的用法如下所示。

  • 通过在驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。
  • Spark 闭包里的执行器代码可以使用累加器的 += 方法(在Java 中是 add)增加累加器的值。
  • 驱动器程序可以调用累加器的 value 属性(在Java 中使用 value()getValue())来访问累加器的值。

注意,工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。在这种模式下,累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。

这里展示的计数在很多时候都非常方便,比如有多个值需要跟踪时,或者当某个值需要在并行程序的多个地方增长时(比如你可能需要对程序中调用JSON 解析库的次数进行计数)。例如,我们一般预期数据的一小部分是毁坏的,或者允许后端有一定的失败数次。为了防止产生含有过多错误的垃圾输出,可以使用累加器对有效记录和无效记录分别进行计数。累加器的值只有在驱动器程序中可以访问,所以检查也应当在驱动器程序中完成。

累加器与容错性

Spark 会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。例如,如果对某分区执行 map() 操作的节点失败了,Spark 会在另一个节点上重新运行该任务。即使该节点没有崩溃,而只是处理速度比别的节点慢很多,Spark 也可以抢占式地在另一个节点上启动一个“投机”(speculative)型的任务副本,如果该任务更早结束就可以直接获取结果。即使没有节点失败,Spark 有时也需要重新运行任务来获取缓存中被移除出内存的数据。因此最终结果就是同一个函数可能对同一个数据运行了多次,这取决于集群发生了什么。

这种情况下累加器要怎么处理呢?实际结果是:对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。

对于在 RDD 转化操作中使用的累加器,就不能保证有这种情况了。转化操作中累加器可能会发生不止一次更新。举个例子,当一个被缓存下来但是没有经常使用的RDD 在第一次从LRU 缓存中被移除并又被重新用到时,这种非预期的多次更新就会发生。这会强制 RDD 根据其谱系进行重算,而副作用就是这也会使得谱系中的转化操作里的累加器进行更新,并再次发送到驱动器中。在转化操作中,累加器通常只用于调试目的。

尽管将来版本的 Spark 可能会把这一行为改成只更新一次累加器的结果,但当前版本(1.2.0)确实会进行多次更新,因此转化操作中的累加器最好只在调试时使用。

自定义累加器

到目前为止,我们学习了如何使用加法操作 Spark 的一种累加器类型整型(Accumulator[Int])。Spark 还直接支持 Double、Long 和 Float 型的累加器。除此以外,Spark 也引入了自定义累加器和聚合操作的 API(比如找到要累加的值中的最大值,而不是把这些值加起来)。自定义累加器需要扩展 AccumulatorParam,这在 Spark API 文档(http://spark.apache.org/docs/latest/api/scala/index.html#package)中有所介绍。只要该操作同时满足交换律和结合律,就可以使用任意操作来代替数值上的加法。比如除了跟踪总和,还可以跟踪数据的最大值。

如果对于任意的 a 和 b,有 a op b = b op a,就说明操作 op 满足交换律。 如果对于任意的 a、b 和 c,有 (a op b) op c = a op (b op c),就说明操作 op 满足结合律。 例如,sum 和max 既满足交换律又满足结合律,是Spark 累加器中的常用操作。

广播变量

Spark 的第二种共享变量类型是广播变量,它可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。

前面提过,Spark 会自动把闭包中所有引用到的变量发送到工作节点上。虽然这很方便,但也很低效。原因有二:首先,默认的任务发射机制是专门为小任务进行优化的;其次,事实上你可能会在多个并行操作中使用同一个变量,但是Spark 会为每个操作分别发送。

举个例子,假设要写一个Spark 程序,通过呼号的前缀来查询对应的国家。虽然前缀的长度不一,但由于每个国家都使用各自的业余呼号前缀,所以这种方法还是可行的。如果用Spark 直接实现,则代码就如下所示。

# 查询RDD contactCounts中的呼号的对应位置。将呼号前缀
# 读取为国家代码来进行查询
signPrefixes = loadCallSignTable()

def processSignCount(sign_count, signPrefixes):
	country = lookupCountry(sign_count[0], signPrefixes)
	count = sign_count[1]
	return (country, count)

countryContactCounts = (contactCounts
		.map(processSignCount)
		.reduceByKey((lambda x, y: x+ y)))

这个程序可以运行,但是如果表更大(比如表中不是呼号而是IP 地址),signPrefixes 很容易就会达到数MB 大小,从主节点为每个任务发送一个这样的数组就会代价巨大。而且,如果之后还要再次使用 signPrefixes 这个对象(可能还要在file2.txt 上运行同样的代码),则还需要向每个节点再发送一遍。

我们可以把 signPrefixes 变为广播变量来解决这一问题。广播变量其实就是类型为 spark.broadcast.Broadcast[T] 的一个对象,其中存放着类型为 T 的值。可以在任务中通过对 Broadcast 对象调用value 来获取该对象的值。这个值只会被发送到各节点一次,使用的是一种高效的类似 BitTorrent 的通信机制。


// 查询RDD contactCounts中的呼号的对应位置。将呼号前缀
// 读取为国家代码来进行查询

val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map {
	case (sign, count) => val country = lookupInArray(sign, signPrefixes.value)
		(country, count)
}.reduceByKey((x, y) => x + y)

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

如以上示例所示,使用广播变量的过程很简单。

  • (1) 通过对一个类型T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。任何可序列化的类型都可以这么实现。
  • (2) 通过 value 属性访问该对象的值(在Java 中为 value() 方法)。
  • (3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

满足只读要求的最容易的使用方式是广播基本类型的值或者引用不可变对象。在这样的情况下,你没有办法修改广播变量的值,除了在驱动器程序的代码中可以修改。但是,有时传一个可变对象可能更为方便与高效。如果你这样做的话,就需要自己维护只读的条件。就像对 Array[String] 类型的呼号前缀表所做的那样,必须确保从节点上运行的代码不会尝试去做诸如 val theArray = broadcastArray.value; theArray(0) = newValue 这样的事情。当在工作节点上执行时,这一行将 newValue 赋给数组的第一个元素,但是只对该工作节点本地的这个数组的副本有效,而不会改变任何其他工作节点上通过 broadcastArray.value 所读取到的内容。

广播的优化

当广播一个比较大的值时,选择既快又好的序列化格式是很重要的,因为如果序列化对象的时间很长或者传送花费的时间太久,这段时间很容易就成为性能瓶颈。尤其是,Spark 的 Scala 和 Java API 中默认使用的序列化库为 Java 序列化库,因此它对于除基本类型的数组以外的任何对象都比较低效。你可以使用 spark.serializer 属性选择另一个序列化库来优化序列化过程(第8 章中会讨论如何使用 Kryo 这种更快的序列化库),也可以为你的数据类型实现自己的序列化方式(对 Java 对象使用 java.io.Externalizable 接口实现序列化,或使用 reduce() 方法为 Python 的 pickle 库定义自定义的序列化)。

基于分区进行操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作,都是我们应当尽量避免为每个元素都配置一次的工作。Spark 提供基于分区的 mapforeach,让你的部分代码只对 RDD 的每个分区运行一次,这样可以帮助降低这些操作的代价。

回到呼号的示例程序中来,我们有一个在线的业余电台呼号数据库,可以用这个数据库查询日志中记录过的联系人呼号列表。通过使用基于分区的操作,可以在每个分区内共享一个数据库连接池,来避免建立太多连接,同时还可以重用JSON 解析器。如例6-10 至例6-12 所示,使用 mapPartitions 函数获得输入RDD 的每个分区中的元素迭代器,而需要返回的是执行结果的序列的迭代器。

在Scala 中使用共享连接池与JSON 解析器

val contactsContactLists = validSigns.distinct()
	.mapPartitions {
        signs => 
        val mapper = createMapper()
        val client = new HttpClient()
        client.start()
        // 创建 http 请求
        signs.map {
            sign => createExchangeForSign(sign)
        // 获取响应
        }.map {
            case (sign, exchange) => (sign, readExchangeCallLog(mapper, exchange))
        // 删除空的呼叫日志
        }.filter (x => x._2 != null)
	}

当基于分区操作 RDD 时,Spark 会为函数提供该分区中的元素的迭代器。返回值方面,也返回一个迭代器。除 mapPartitions() 外,Spark 还有一些别的基于分区的操作符,列在了表6-1 中。

mapPartitions() :

调用提供: 该分区中元素的迭代器。

返回: 返回的元素的迭代器

对 RDD[T] 的函数签名:f: (Iterator[T]) -> Iterator[U]

mapPartitionsWithIndex() :

调用提供: 分区序号,以及每个分区中的元素的迭代器。

返回:

对 RDD[T] 的函数签名:f:(Int, Iterator[T]) -> Iterator[U]

foreachPartitions() :

调用提供: 元素迭代器

返回: 无

对 RDD[T] 的函数签名: f:(Iterator[T]) -> Unit

除了避免重复的配置工作,也可以使用 mapPartitions() 避免创建对象的开销。有时需要创建一个对象来将不同类型的数据聚合起来。回忆一下第 3 章中,当计算平均值时,一种方法是将数值 RDD 转为二元组 RDD,以在归约过程中追踪所处理的元素个数。现在,可以为每个分区只创建一次二元组,而不用为每个元素都执行这个操作,参见例6-13 和例6-14。

与外部程序间的管道

有三种可用的语言供你选择,这可能已经满足了你用来编写 Spark 应用的几乎所有需求。但是,如果 Scala、Java 以及 Python 都不能实现你需要的功能,那么 Spark 也为这种情况提供了一种通用机制,可以将数据通过管道传给用其他语言编写的程序,比如 R 语言脚本。

Spark 在RDD 上提供 pipe() 方法。Spark 的 pipe() 方法可以让我们使用任意一种语言实现 Spark 作业中的部分逻辑,只要它能读写 Unix 标准流就行。通过 pipe(),你可以将 RDD 中的各元素从标准输入流中以字符串形式读出,并对这些元素执行任何你需要的操作,然后把结果以字符串的形式写入标准输出——这个过程就是RDD 的转化操作过程。这种接口和编程模型有较大的局限性,但是有时候这恰恰是你想要的,比如在 mapfilter 操作中使用某些语言原生的函数。

有时候,由于你已经写好并测试好了一些很复杂的软件,所以会希望把 RDD 中的内容通过管道交给这些外部程序或者脚本来进行处理并重用。很多数据科学家都用 R 写好的代码,可以通过 pipe() 与 R 程序进行交互。

在例6-15 中,我们使用一个 R 语言的库来计算所有联系人的距离。程序会把 RDD 的每个元素都以换行符作为分隔符写出去,而那个 R 程序输出的每一行都是字符串,用来构成结果 RDD 中的元素。为了让R 程序能够比较简单地解析输入,我们会把数据以 mylat, mylon, theirlat, theirlon 的格式重新组织。这里使用逗号作为分隔符。

« EOF »

If you like TeXt, don’t forget to give me a star :star2:.