RDD 基础

 

RDD (Resilient Distributed Dataset) 是 Spark 对数据的核心抽象——弹性分布式数据集。

RDD 其实就是分布式的元素集合,在 Spark 中,对数据的所有操作不外乎就是创建 RDD、转换已有的 RDD 以及调用 RDD 操作进行求值。而这一切的背后,Spark 会自动将 RDD 中的数据分发到集群上,并将操作并行化执行。

RDD

Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,也可以包含用户自定义的对象。

可以使用两种方法创建 RDD:

  • 读取一个外部数据集
  • 在驱动器程序里分发驱动器程序中的对象集合(比如 List 和 Set)
val lines = sc.textFile("README.md")

RDD 支持两种类型的操作:转化(transformation)和行动(action)。转化操作会由一个 RDD 生成一个新的 RDD。行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中。

转化操作和行动操作的区别还在于 Spark 计算 RDD 的方式不同:虽然可以在任何时候定义新的 RDD,但是 Spark 只会惰性计算这些 RDD,他们只有第一次在一个行动操作中才会真正计算。

默认情况下,Spark 的 RDD 会在每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 方法让 Spark 把这个 RDD 缓存下来,可以缓存到内存和磁盘中。

实际操作中,可能会经常用到 persist() 来吧数据的一部分读取到内存中,并反复查询这部分数据。

总的来说,么个 Spark 程序或 Shell 会话都大致按如下方式工作:

  1. 从外部数据创建出输入 RDD。
  2. 使用诸如 filter() 这类的转化操作对 RDD 进行转化。
  3. 告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 缓存。
  4. 使用诸如 count()first() 等行动操作触发一次并行计算,Spark 会对计算进行优化后再执行。

创建 RDD

创建 RDD 最简单的方式就是把程序中一个已经有的集合传给 SparkContextparallelize() 方法。多用于开发和原型测试。

更常用的方式是从外部存储中读取数据来创建 RDD。

val lines = sc.parallelize(List("pandas", "I like pandas"))
val lines2 = sc.textFile("/path/to/file")

或则

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "I like pandas"));
JavaRDD<String> lines2 = sc.textFile("/path/to/file");

操作 RDD

常见的转化操作

例如对数据内容为 {1, 2, 3, 3} 的 RDD 执行基本的转化操作:

map()

将函数应用于 RDD 中的每个元素,将返回值构成的新 RDD

rdd.map(x => x + 1)

// {2, 3, 4, 4}

filter()

flatMap()

rdd.flatMap(x => x.to(3))
// {1,2 3, 2, 3, 3, 3}

伪集合操作

尽管 RDD 本身不是严格意义上的集合操作,但它也支持许多数学上的集合操作,不过要求这些操作的 RDD 是相同数据类型的。

  • union(other) : 返回一个包含两个 RDD 中所有元素的 RDD。

  • intersection(other) : 只返回两个 RDD 中都有的元素。运行时也会去掉所有重复的元素(单个 RDD 内的重复元素也会一起移除)。尽管与 union 的概念类似,但是 intersection 的性能要差很多,因为它需要公通过网络混洗数据来发现共有的元素。

  • subtract(other) 函数接收另一个 RDD 作为参数,返回一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD。和 intersection() 一样,它也需要数据混洗。

  • cartesian(other) 转化操作会返回所有可能的(a, b) 对(笛卡尔积),其中a 是源RDD 中的元素,而b 则来自另一个RDD。不过要特别注意的是,求大规模RDD 的笛卡儿积开销巨大。

勒内·笛卡尔,法语:René Descartes;拉丁名:Renatus Cartesius(瑞那图斯·卡提修斯)

行动操作

RDD 的一些行动操作会以普通集合或者值的形式将 RDD 的部分或全部数据返回驱动器程序中。

reduce()

很有可能会用到的常见行动操作是 reduce()。他接收一个函数作为参数,这个函数要操作啷个相同元素类型的 RDD 数据并返回一个同样类型的新元素。

val sum = rdd.recuce((x, y) => x + y)

fold()reduce() 类似,接收一个与 redice() 接收的函数签名相同的函数,再加上一个初始值来作为每一个分区第一次调用时的结果。

collect()

它会将整个RDD 的内容返回。collect() 通常在单元测试中使用,因为此时 RDD 的整个内容不会很大,可以放在内存中。使用 collect() 使得RDD 的值与预期结果之间的对比变得很容易。由于需要将数据复制到驱动器进程中,collect() 要求所有数据都必须能一同放入单台机器的内存中。

take(n)

返回 RDD 中的 n 个元素,并且尝试只访问尽量少的分区。因此该操作会得到一个不均衡的集合。需要注意的是,这些操作返回元素的顺序与你预期的可能不一样。

这些操作对于单元测试和快速调试都很有用,但是在处理大规模数据时会遇到瓶颈。

惰性求值

持久化

出于不同的目的,我们可以为 RDD 选择不同的持久化级别。在 Scala 和 Java 中,默认情况下 persist() 会把数据以序列化的形式缓存在JVM 的堆空间中。在 Python 中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在 JVM 堆空间中。当我们把数据写到磁盘或者堆外存储上时,也总是使用序列化后的数据。

表3-6: org.apache.spark.storage.StorageLevelpyspark.StorageLevel 中的持久化级别;如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份

级别 使用的空间 CPU 时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONEY —-
MEMORY_ONEY_SER —-
MEMORY_AND_DISK 部分 部分 内存放不下则溢写到磁盘上
MEMORY_AND_DISK_SER 部分 部分 内存放不下则溢写到磁盘上
DISK_ONLY —-
import org.apache.spark.storage.StorageLevel

val result = input.map(x => x * x)
result.persis(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

注意:我们在第一次对这个 RDD 调用行动操作前就调用了 persist() 方法,persist() 调用本身不会触发强制求值。

如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘。不论哪一种情况,都不必担心你的作业因为缓存了太多数据而被打断。不过,缓存不必要的数据会导致有用的数据被移出内存,带来更多重算的时间开销。

最后,RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的RDD 从缓 存中移除。

键值对操作

Spark 为包含键值对类型的 RDD 提供了一些专有的操作,这些 RDD 被称为 Pair RDD,Pair RDD 是很多程序的构成要素,因为它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。例如,Pair RDD 提供 reduceByKey() 方法,可以分别归约每个键对应的数据,还有 join() 方法,可以把两个 RDD 中键相同的元素组合到一起,合并为一个 RDD。我们通常从一个 RDD 中提取某些字段(例如代表事件时间、用户ID 或者其他标识符的字段),并使用这些字段作为 Pair RDD 操作中的键。


  @Test
  def pairRddTest() :Unit = {

    val lines: List[String] = List( "Hello world", "Hello China")
    val pairs: List[(String, Int)] = lines.flatMap(x => x.split(" "))
      .map(x => (x, 1))

    this.withSparkContext(sc => {
      val rdd:RDD[(String, Int)] = sc.parallelize(lines)
        .flatMap(x => x.split(" "))
        .map(x => (x, 1))

      val array = rdd.reduceByKey((x, y) => x + y)
        .collect()


      for ((x, y) <- array) {
        logger.debug(" {} ==> {}", x, y)
      }

    })
  }

Pair RDD 的转化操作

rdd = {(1, 2), (3, 4), (3, 6)}
other = 3

reduceByKey(func)

合并具有相同键的值。{(1, 2), (3, 10)}

groupByKey()

对具有相同键的值进行分组。{(1, [2]), (3, [4, 6])}

combinByKey(createCombiner, mergeValue, mergeCombiners, partitioner)

使用不同的返回类型合并具有相同 key 的值。是最为常用的基于 Key 进行聚合的函数。大多数基于 Key 聚合的函数都是用它实现的。和 aggreagte() 一样 combineByKey() 可以让用户返回与输入数据的类型不同的返回值。

mapValues(func)

对 Pair RDD 中的每个 value 应用一个函数而不改变 Key。

flatMapValues(func) :

对 Pair RDd 中的每个值用于一个返回迭代器的函数,然后对返回的每个元素都生成一个对应 Key 的 key-value 对记录。

keys

返回一个仅报刊 Key 的 RDD。

values :

返回一个仅包含 Value 的 RDD。

sortByKey() :

返回一个根据 Key 排序的 RDD。

subtractByKey(other) :

删除 RDD 中 Key 与 other RDD 中 Key 相同的元素。

join(other) :

对两个 RDD 进行内连接 {(3, (4,9)), (3, (6, 9))}

rightOuterJoin :

右外连接:

rdd.rightOuterJoin(other) {(3, (Some(4), 9)), (3, (Some(6), 9))}

leftOuterJoin :

左外连接:

rdd.leftOuterJoin(other) {(1, (2, NONE)), (3, (4, Some(9))), (3, (6, Some(9)))}

cogroup :

将两个 RDD 中有用相同键的数据分组:

rdd.cogroup(other) {(1, ([2], [])), (3, ([4, 6],[9]))}

数据存取与保存

文本文件


// scala
val input = sc.textFile("file:///file/path")
rdd.saveAsTextFile(outputFileDir)
// java
JavaRDD<String> input = sc.textFile("file:///path")

json 文件

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature

// 必须是顶级类
case class Person(name: String, lovesPandas: Boolean) 


// 将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None)
// 来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result = input.flatMap(record => {
	try {
		Some(mapper.readValue(record, classOf[Person]))
	} catch {
		case e: Exception => None
	}
})

result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_))
    .saveAsTextFile(outputFile)

Hive

要把Spark SQL 连接到已有的 Hive 上,你需要提供 Hive 的配置文件。你需要将 hive-site.xml 文件复制到Spark 的 ./conf/ 目录下。这样做好之后,再创建出 HiveContext 对象,也就是 Spark SQL 的入口,然后你就可以使用 Hive 查询语言(HQL)来对你的表进行查询,并以由行组成的RDD 的形式拿到返回数据,


import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段


如果你有记录间结构一致的 JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,并将这些数据读取为记录,这样就可以使得提取字段的操作变得很简单。要读取 JSON 数据,首先需要和使用 Hive 一样创建一个 HiveContext。(不过在这种情况下我们不需要安装好 Hive,也就是说你也不需要 hive-site.xml 文件。)然后使用HiveContext.jsonFile 方法来从整个文件中获取由 Row 对象组成的RDD。除了使用整个 Row 对象,你也可以将 RDD 注册为一张表,然后从中选出特定的字段。例如,假设有一个包含推文的JSON 文件,格 式下。

{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
// 在 Scala 中
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")

// 在Java 中使用Spark SQL 读取JSON 数据
SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
tweets.registerTempTable("tweets");
SchemaRDD results = hiveCtx.sql("SELECT user.name, text FROM tweets");


数据库


def createConnection() = {
	Class.forName("com.mysql.jdbc.Driver").newInstance();
	DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}

def extractValues(r: ResultSet) = {
	(r.getInt(1), r.getString(2))
}

val data = new JdbcRDD(sc, 
	createConnection, 
	"SELECT * FROM panda WHERE ? <= id AND id <= ?",
	lowerBound = 1, 
	upperBound = 3, 
	numPartitions = 2, 
	mapRow = extractValues)
println(data.collect().toList)


JdbcRDD 接收这样几个参数。

  • 首先,要提供一个用于对数据库创建连接的函数。这个函数让每个节点在连接必要的配置后创建自己读取数据的连接。
  • 接下来,要提供一个可以读取一定范围内数据的查询,以及查询参数中 lowerBoundupperBound 的值。这些参数可以让 Spark 在不同机器上查询不同范围的数据,这样就不会因尝试在一个节点上读取所有数据而遭遇性能瓶颈。
  • 这个函数的最后一个参数是一个可以将输出结果从 java.sql.ResultSet(http://docs.oracle.com/javase/7/docs/api/java/sql/ResultSet.html)转为对操作数据有用的格式的函数。上面的例子中,我们会得到 (Int, String) 对。如果这个参数空缺,Spark 会自动将每行结果转为一个对象数组。

HBase

由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过 Hadoop 输入格式访问 HBase。这个输入格式会返回键值对数据,其中键的类型为 org.apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为 org.apache.hadoop.hbase.client.Result。Result 类包含多种根据列获取值的方法,在其API 文档(https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Result.html)中有所描述。

要将 Spark 用于 HBase,你需要使用正确的输入格式调用 SparkContext.newAPIHadoopRDD

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 扫描哪张表
val rdd = sc.newAPIHadoopRDD(conf, 
	classOf[TableInputFormat], 
	classOf[ImmutableBytesWritable],
	classOf[Result])

TableInputFormat 包含多个可以用来优化对 HBase 的读取的设置项,比如将扫描限制到一部分列中,以及限制扫描的时间范围。你可以在 TableInputFormat 的API 文档(http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html) 中找到这些选项,并在 HBaseConfiguration 中设置它们,然后再把它传给 Spark。

« EOF »

If you like TeXt, don’t forget to give me a star :star2:.