Spark 学习笔记03-运行

 

Spark 运行时架构

在分布式环境下,Spark 集群采用的是主/从结构。在一个 Spark 集群中,有一个节点负责中央协调,调度各个分布式工作节点。这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器(executor)节点。驱动器节点可以和大量的执行器节点进行通信,它们也都作为独立的 Java 进程运行。驱动器节点和所有的执行器节点一起被称为一个 Spark 应用(application)。

Spark 应用通过一个叫作集群管理器(Cluster Manager)的外部服务在集群中的机器上启动。Spark 自带的集群管理器被称为独立集群管理器。Spark 也能运行在 Hadoop YARN 和 Apache Mesos 这两大开源集群管理器上。

驱动器节点

Spark 驱动器是执行你的程序中的 main() 方法的进程。它执行用户编写的用来创建 SparkContext、创建 RDD,以及进行 RDD 的转化操作和行动操作的代码。

其实,当启 动Spark shell 时,就启动了一个 Spark 驱动器程序(相信你还记得,Spark shell 总是会预先加载一个叫作 sc 的SparkContext 对象)。驱动器程序一旦终止,Spark 应用也就结束了。

驱动器程序在Spark 应用中有下述两个职责。

把用户程序转为任务

Spark 驱动器程序负责把用户程序转为多个物理执行的单元,这些单元也被称为任务(task)。从上层来看,所有的 Spark 程序都遵循同样的结构:程序从输入数据创建一系 列 RDD,再使用转化操作派生出新的 RDD,最后使用行动操作收集或存储结果 RDD 中的数据。

Spark 程序其实是隐式地创建出了一个由操作组成的逻辑上的有向无环图(Directed Acyclic Graph,简称 DAG)。当驱动器程序运行时,它会把这个逻辑图转为物理执行计划。

Spark 会对逻辑执行计划作一些优化,比如将连续的映射转为流水线化执行,将多个操作合并到一个步骤中等。这样 Spark 就把逻辑计划转为一系列步骤(stage)。而每个步骤又由多个任务组成。这些任务会被打包并送到集群中。任务是 Spark 中最小的工作单元,用户程序通常要启动成百上千的独立任务。

为执行器节点调度任务

有了物理执行计划之后,Spark 驱动器程序必须在各执行器进程间协调任务的调度。执行器进程启动后,会向驱动器进程注册自己。因此,驱动器进程始终对应用中所有的执行器节点有完整的记录。每个执行器节点代表一个能够处理任务和存储 RDD 数据的进程。

Spark 驱动器程序会根据当前的执行器节点集合,尝试把所有任务基于数据所在位置分配给合适的执行器进程。当任务执行时,执行器进程会把缓存数据存储起来,而驱动器进程同样会跟踪这些缓存数据的位置,并且利用这些位置信息来调度以后的任务,以尽量减少数据的网络传输。

驱动器程序会将一些 Spark 应用的运行时的信息通过网页界面呈现出来,默认在端口 4040 上。比如,在本地模式下,访问 http://localhost:4040 就可以看到这个网页了。

执行器节点

Spark 执行器节点是一种工作进程,负责在 Spark 作业中运行任务,任务间相互独立。

Spark 应用启动时,执行器节点就被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有执行器节点发生了异常或崩溃,Spark 应用也可以继续执行。执行器进程有两大作用:

  • 第一,它们负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
  • 第二,它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在执行器进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

集群管理器

到目前为止,我们已经介绍了驱动器节点和执行器节点的抽象概念。那么,驱动器节点和执行器节点是如何启动的呢? Spark 依赖于集群管理器来启动执行器节点,而在某些特殊情况下,也依赖集群管理器来启动驱动器节点。集群管理器是 Spark 中的可插拔式组件。这样,除了 Spark 自带的独立集群管理器,Spark 也可以运行在其他外部集群管理器上,比如 YARN 和 Mesos。

小结

回顾在集群上运行Spark 应用的详细过程,可把本节的主要概念作如下总结。

  • (1) 用户通过spark-submit 脚本提交应用。
  • (2) spark-submit 脚本启动驱动器程序,调用用户定义的 main() 方法。
  • (3) 驱动器程序与集群管理器通信,申请资源以启动执行器节点。
  • (4) 集群管理器为驱动器程序启动执行器节点。
  • (5) 驱动器进程执行用户应用中的操作。根据程序中所定义的对 RDD 的转化操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进程。
  • (6) 任务在执行器程序中进行计算并保存结果。
  • (7) 如果驱动器程序的main() 方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。

使用spark-submit部署应用

spark-submit 的 –master 标记可以接收的值

  • spark://host:port 连接到指定端口的 Spark 独立集群上。默认情况下 Spark 独立主节点使用7077 端口
  • mesos://host:port 连接到指定端口的 Mesos 集群上。默认情况下 Mesos 主节点监听5050 端口
  • yarn 连接到一个 YARN 集群。当在YARN 上运行时,需要设置环境变量 HADOOP_CONF_DIR 指向 Hadoop 配置目录,以获取集群信息
  • local 运行本地模式,使用单核
  • local[N] 运行本地模式,使用 N 个核心
  • local[*] 运行本地模式,使用尽可能多的核心

spark-submit的一些常见标记

  • --master 表示要连接的集群管理器。这个标记可接收的选项见表7-1
  • --deploy-mode选择在本地(客户端“client”)启动驱动器程序,还是在集群中的一台工作节点机器(集群“cluster”)上启动。在客户端模式下,spark-submit 会将驱动器程序运行在spark-submit 被调用的这台机器上。在集群模式下,驱动器程序会被传输并执行于集群的一个工作节点上。默认是本地模式
  • --class 运行 Java 或 Scala 程序时应用的主类
  • --name 应用的显示名,会显示在 Spark 的网页用户界面中
  • --jars 需要上传并放到应用的 CLASSPATH 中的 JAR 包的列表。如果应用依赖于少量第三方的JAR 包,可以把它们放在这个参数里
  • --files 需要放到应用工作目录中的文件的列表。这个参数一般用来放需要分发到各节点的数据文件
  • --py-files 需要添加到PYTHONPATH 中的文件的列表。其中可以包含.py、.egg 以及.zip 文件
  • --executor-memory 执行器进程使用的内存量,以字节为单位。可以使用后缀指定更大的单位,比如“512m”(512 MB)或“15g”(15 GB)
  • --driver-memory 驱动器进程使用的内存量,以字节为单位。可以使用后缀指定更大的单位,比如“512m”(512 MB)或“15g”(15 GB)

spark-submit 还允许通过 --conf prop=value 标记设置任意的 SparkConf 配置选项,也可以使用 --properties-File 指定一个包含键值对的属性文件。

# 使用独立集群模式提交Java应用
$ ./bin/spark-submit \
--master spark://hostname:7077 \
--deploy-mode cluster \
--class com.databricks.examples.SparkExample \
--name "Example Program" \
--jars dep1.jar,dep2.jar,dep3.jar \
--total-executor-cores 300 \
--executor-memory 10g \
myApp.jar "options" "to your application" "go here"
# 使用YARN客户端模式提交Python应用
$ export HADOOP_CONF_DIR=/opt/hadoop/conf
$ ./bin/spark-submit \
--master yarn \
--py-files somelib-1.2.egg,otherlib-4.4.zip,other-file.py \
--deploy-mode client \
--name "Example Program" \
--queue exampleQueue \
--num-executors 40 \
--executor-memory 10g \
my_script.py "options" "to your application" "go here"

打包代码与依赖

如果你的程序引入了任何既不在 org.apache.spark 包内也不属于语言运行时的库的依赖,你就需要确保所有的依赖在该 Spark 应用运行时都能被找到。

对于 Python 用户而言,有多种安装第三方库的方法。由于 PySpark 使用工作节点机器上已有的 Python 环境,你可以通过标准的 Python 包管理器(比如 pip 和 easy_install)直接在集群中的所有机器上安装所依赖的库,或者把依赖手动安装到 Python 安装目录下的sitepackages/ 目录中。你也可以使用 spark-submit 的 --py-Files 参数提交独立的库,这样它们也会被添加到 Python 解释器的路径中。如果你没有在集群上安装包的权限,可以手动添加依赖库,这也很方便,但是要防范与已经安装在集群上的那些包发生冲突。

当你提交应用时,绝不要把Spark 本身放在提交的依赖中。spark-submit 会自动确保Spark 在你的程序的运行路径中。

Java 和 Scala 用户也可以通过 spark-submit 的 --jars 标记提交独立的 JAR 包依赖。当只有一两个库的简单依赖,并且这些库本身不依赖于其他库时,这种方法比较合适。但是一般 Java 和 Scala 的工程会依赖很多库。当你向 Spark 提交应用时,你必须把应用的整个依赖传递图中的所有依赖都传给集群。你不仅要传递你直接依赖的库,还要传递这些库的依赖,以及它们的依赖的依赖,等等。手动维护和提交全部的依赖 JAR 包是很笨的方法。事实上,常规的做法是使用构建工具,生成单个大 JAR 包,包含应用的所有的传递依赖。这 通常被称为超级(uber)JAR 或者组合(assembly) JAR,大多数 Java 或 Scala 的构建工具都支持生成这样的工件。

Java 和 Scala 中使用最广泛的构建工具是 Maven 和 sbt。它们都可以用于这两种语言,不过 Maven 通常用于 Java 工程,而 sbt 则一般用于Scala 工程。在这里,我们会分别针对使用这两种工具构建 Spark 应用给出相应的例子。你可以把这里的示例当作自己构建Spark 工程的模板。

Maven



    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.8</arg>
                    </args>
                </configuration>
            </plugin>
         
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- 用来创建超级JAR包的Maven shade插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>

                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.example.myscala002.DemoApplication</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

sbt

import AssemblyKeys._
name := "Simple Project"
version := "1.0"
organization := "com.databricks"
scalaVersion := "2.10.3"
libraryDependencies ++= Seq(
    // Spark依赖
    "org.apache.spark" % "spark-core_2.10" % "1.2.0" % "provided",
    // 第三方库
    "net.sf.jopt-simple" % "jopt-simple" % "4.3",
    "joda-time" % "joda-time" % "2.0"
)

// 这条语句打开了assembly插件的功能
assemblySettings

// 配置assembly插件所使用的JAR
jarName in assembly := "my-project-assembly.jar"

// 一个用来把Scala本身排除在组合JAR包之外的特殊选项,因为Spark
// 已经包含了Scala
assemblyOption in assembly :=
    (assemblyOption in assembly).value.copy(includeScala = false)

Spark 应用内与应用间调度

刚才的例子中只有一个用户向集群提交作业。而在现实中,许多集群是在多个用户间共享 的。共享的环境会带来调度方面的挑战:如果两个用户都启动了希望使用整个集群所有资 源的应用,该如何处理呢? Spark 有一系列调度策略来保障资源不会被过度使用,还允许 工作负载设置优先级。 在调度多用户集群时,Spark 主要依赖集群管理器来在Spark 应用间共享资源。当Spark 应 用向集群管理器申请执行器节点时,应用收到的执行器节点个数可能比它申请的更多或者 更少,这取决于集群的可用性与争用。许多集群管理器支持队列,可以为队列定义不同优 先级或容量限制,这样Spark 就可以把作业提交到相应的队列中。请查看你所使用的集群 管理器的文档获取详细信息。 Spark 应用有一种特殊情况,就是那些长期运行(long lived)的应用。这意味着这些应用 从不主动退出。Spark SQL 中的JDBC 服务器就是一个长期运行的Spark 应用。当JDBC 服务器启动后,它会从集群管理器获得一系列执行器节点,然后就成为用户提交SQL 查询 的永久入口。由于这个应用本身就是为多用户调度工作的,所以它需要一种细粒度的调度 机制来强制共享资源。Spark 提供了一种用来配置应用内调度策略的机制。Spark 内部的公 平调度器(Fair Scheduler)会让长期运行的应用定义调度任务的优先级队列。本书对这部 分内容未作深入探讨,若想了解详情,请参考公平调度器的官方文档:(http://spark.apache. org/docs/latest/job-scheduling.html)。

« EOF »

If you like TeXt, don’t forget to give me a star :star2:.