CPM
kudu介绍
1.1 背景介绍
在KUDU之前,大数据主要以两种方式存储:
(1)静态数据:
以 HDFS 引擎作为存储引擎,适用于高吞吐量的离线大数据分析场景。这类存储的局限性是数据无法进行随机的读写。
(2)动态数据:
以 HBase、Cassandra 作为存储引擎,适用于大数据随机读写场景。这类存储的局限性是批量读取吞吐量远不如 HDFS,不适用于批量数据分析的场景。
从上面分析可知,这两种数据在存储方式上完全不同,进而导致使用场景完全不同,但在真实的场景中,边界可能没有那么清晰,面对既需要随机读写,又需要批量分析的大数据场景,该如何选择呢?这个场景中,单种存储引擎无法满足业务需求,我们需要通过多种大数据工具组合来满足这一需求。
如上图所示,数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 HBase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如:HDFS)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的场景,但他有如下缺点:
- (1)架构复杂。从架构上看,数据在 HBase、消息队列、HDFS 间流转,涉及环节太多,运维成本很高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。
- (2)时效性低。数据从 HBase 导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时效性上不是很高。
- (3)难以应对后续的更新。真实场景中,总会有数据是延迟到达的。如果这些数据之前已经从 HBase 导出到 HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后重写一遍,但这代价又很高。
为了解决上述架构的这些问题,KUDU 应运而生。KUDU 的定位是 Fast Analytics on Fast Data,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎。
从上图可以看出,Kudu 是一个折中的产品,在 HDFS 和 HBase 这两个偏科生中平衡了随机读写和批量分析的性能。从 Kudu 的诞生可以说明一个观点:底层的技术发展很多时候都是上层的业务推动的,脱离业务的技术很可能是空中楼阁。
1.2 kudu是什么
Apache Kudu 是由 Cloudera 开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。它是一个融合 HDFS 和 HBase 的功能的新组件,具备介于两者之间的新存储组件。
Kudu 支持水平扩展,并且与 Cloudera Impala 和 Apache Spark 等当前流行的大数据查询和分析工具结合紧密。
1.3 kudu的应用场景
- Strong performance for both scan and random access to help customers simplify complex hybrid architectures(适用于那些既有随机访问,也有批量数据扫描的复合场景)
- High CPU efficiency in order to maximize the return on investment that our customers are making in modern processors(高计算量的场景)
- High IO efficiency in order to leverage modern persistent storage(使用了高性能的存储设备,包括使用更多的内存)
- The ability to update data in place, to avoid extraneous processing and data movement(支持数据更新,避免数据反复迁移)
- The ability to support active-active replicated clusters that span multiple data centers in geographically distant locations(支持跨地域的实时数据备份和查询)
国内使用的kudu一些案例可以查看《构建近实时分析系统.pdf》文档。
2、Kudu的架构
与 HDFS 和 HBase 相似,Kudu 使用单个的 Master 节点,用来管理集群的元数据,并且使用任意数量的Tablet Server(可对比理解 HBase 中的 Region Server 角色)节点用来存储实际数据。可以部署多个 Master 节点来提高容错性。一个 table 表的数据,被分割成 1 个或多个 Tablet,Tablet 被部署在 Tablet Server 来提供数据读写服务。
下面是一些基本概念:
Master
集群中的老大,负责集群管理、元数据管理等功能
Tablet Server
集群中的小弟,负责数据存储,并提供数据读写服务
一个 tablet server 存储了table表的tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。
只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。
Table(表)
一张 talbe 是数据存储在 Kudu 的 tablet server 中。表具有 schema 和全局有序的 primary key(主键)。table 被分成称为 tablets 的 segments。
Tablet
一个 tablet 是一张 table 连续的 segment,tablet 是 kudu 表的水平分区,类似于 google Bigtable 的tablet,或者 HBase 的 region。每个 tablet 存储着一定连续range的数据(key),且 tablet 两两间的range 不会重叠。一张表的所有 tablet 包含了这张表的所有 key 空间。与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的 tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是 leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server 之间达成一致性。
java 代码操作
请参考 test-kudu-1 项目
kudu 的分区
为了提供可扩展性,Kudu 表被划分为称为 tablets 的单元,并分布在许多 tablet servers 上。行总是属于单个tablet 。将行分配给 tablet 的方法由在表创建期间设置的表的分区决定。 kudu 提供了 3 种分区方式。
Range Partitioning ( 范围分区 )
范围分区可以根据存入数据的数据量,均衡的存储到各个机器上,防止机器出现负载不均衡现象.
public void partitioningByRange() {
List<ColumnSchema> schemas = Arrays.asList(
newKeyColumn("company_id", Type.INT32),
newColumn("work_id", Type.INT32),
newColumn("name", Type.STRING),
newColumn("gender", Type.STRING),
newColumn("photo", Type.STRING)
);
Schema schema = new Schema(schemas);
CreateTableOptions tableOptions = new CreateTableOptions();
// 设置副本数
tableOptions.setNumReplicas(1);
// 设置范围分区的规则
// 设置按照哪个字段进行分区
List<String> parcols = Collections.singletonList("company_id");
tableOptions.setRangePartitionColumns(parcols);
/*
* range
* 0 <= value < 10
* 10 <= value < 20
* 20 <= value < 30
* ........
* 80 <= value < 90
*/
int count = 0;
for (int i = 0; i < 10; i++) {
// 范围开始
PartialRow lower = schema.newPartialRow();
lower.addInt("company_id", count);
System.out.println("count => "+ count);
// 范围结束
PartialRow upper = schema.newPartialRow();
count += 10;
upper.addInt("company_id", count);
System.out.println("count => "+ count);
// 设置每一个分区的范围
tableOptions.addRangePartition(lower, upper);
}
try (KuduClient client = this.getKuduClient()) {
client.createTable("employee", schema, tableOptions);
} catch (KuduException e) {
logger.error("", e);
}
}
Hash Partitioning ( 哈希分区 )
哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一; 哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。
public void partitioningByHash() {
List<ColumnSchema> schemas = Arrays.asList(
newKeyColumn("company_id", Type.INT32),
newColumn("work_id", Type.INT32),
newColumn("name", Type.STRING),
newColumn("gender", Type.STRING),
newColumn("photo", Type.STRING)
);
Schema schema = new Schema(schemas);
CreateTableOptions tableOptions = new CreateTableOptions();
// 设置副本数
tableOptions.setNumReplicas(1);
// 设置范围分区的规则
// 设置按照哪个字段进行分区
List<String> parcols = Collections.singletonList("company_id");
tableOptions.addHashPartitions(parcols, 6);
try (KuduClient client = this.getKuduClient()) {
client.createTable("employee2", schema, tableOptions);
} catch (KuduException e) {
logger.error("", e);
}
}
Multilevel Partitioning ( 多级分区 )
Kudu 允许一个表在单个表上组合多级分区。当正确使用时,多级分区可以保留各个分区类型的优点,同时减少每个分区的缺点.
/**
* 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet无限增长问题,
*/
public void partitioningByMultilevel() {
List<ColumnSchema> schemas = Arrays.asList(
newKeyColumn("company_id", Type.INT32),
newColumn("work_id", Type.INT32),
newColumn("name", Type.STRING),
newColumn("gender", Type.STRING),
newColumn("photo", Type.STRING)
);
Schema schema = new Schema(schemas);
CreateTableOptions tableOptions = new CreateTableOptions();
// 设置副本数
tableOptions.setNumReplicas(1);
// 设置范围分区的规则
// Hash 分区
List<String> parcols = Collections.singletonList("company_id");
tableOptions.addHashPartitions(parcols, 6);
// Range 分区
int count = 0;
for (int i = 0; i < 10; i++) {
PartialRow lower = schema.newPartialRow();
lower.addInt("company_id", count);
// 范围结束
PartialRow upper = schema.newPartialRow();
count += 10;
upper.addInt("company_id", count);
// 设置每一个分区的范围
tableOptions.addRangePartition(lower, upper);
}
try (KuduClient client = this.getKuduClient()) {
client.createTable("employee3", schema, tableOptions);
} catch (KuduException e) {
logger.error("", e);
}
}
Spark 代码操作
Spark 与 Kudu 集成支持:
- DDL操作(创建/删除)
- 本地 Kudu RDD
- Native Kudu 数据源,用于 DataFrame 集成
- 从 kudu 读取数据
- 从 Kudu 执行 insert/update/upsert/delete
- 谓词下推
- Kudu 和 Spark SQL 之间的模式映射
到目前为止,我们已经听说过几个上下文,例如 SparkContext
, SQLContext
, HiveContext
, SparkSession
,现在,我们将使用 Kudu 引入一个 KuduContext
。这是可在 Spark 应用程序中广播的主要可序列化对象。此类代表在 Spark 执行程序中与 Kudu Java 客户端进行交互。
KuduContext 提供执行 DDL 操作所需的方法,与本机 Kudu RDD 的接口,对数据执行更新/插入/删除,将数据类型从 Kudu 转换为 Spark 等。
Maven 依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<log4j2.version>2.17.0</log4j2.version>
<hbase.version>2.2.3</hbase.version>
<kudu.version>1.12.0.7.1.4.0-203</kudu.version>
<scala.version>2.11.12</scala.version>
<spark.version>2.4.5</spark.version>
<oracle.driver.version>12.1.0.2</oracle.driver.version>
<docker.image.prefix>bba-rtm</docker.image.prefix>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Kudu CDP -->
<!-- <scope>provided</scope>-->
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
KuduContext 操作 kudu
定义kudu的表需要分成5个步骤:
- 1:提供表名
- 2:提供 schema
- 3:提供主键
- 4:定义重要选项;例如:定义分区的 schema
- 5:调用 create Table api
package com.bba.rtm.demo
import org.apache.kudu.client.{CreateTableOptions, ListTablesResponse}
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
import scala.collection.mutable
object KuduSparkDemo {
private val logger: Logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
val masterServers = "cdpcluster-2.futuremove.cn:7051"
def main(args: Array[String]): Unit = {
// 构建 SparkConf
val sparkConf: SparkConf = new SparkConf().
setAppName(this.getClass.getSimpleName).
setMaster("local[2]")
// 构建 SparkSession
val sparkSession: SparkSession = SparkSession.
builder().
config(sparkConf).
getOrCreate()
// 获取 SparkContext
val sparkContext: SparkContext = sparkSession.sparkContext
sparkContext.setLogLevel("warn")
try {
// 构建 KuduContext
val kuduContext: KuduContext = new KuduContext(masterServers, sparkContext)
// createTable(kuduContext)
deleteTable(kuduContext)
} finally {
sparkContext.stop()
sparkSession.close()
logger.info("-- end --")
}
}
private def createTable(kuduContext: KuduContext): Unit = {
// 1.1 定义表名
val tableName = "spark_kudu_1"
// 1.2 定义 Schema (使用 StructType )
val schema: StructType = StructType(
// column-name, data-type, nullable
StructField("user_id", StringType, false) ::
StructField("event_time", LongType, false) ::
StructField("name", StringType, false) ::
StructField("age", IntegerType, false) ::
StructField("gender", StringType, false) :: Nil)
// 1.3 定义表的主键
val primaryKey = Seq("user_id", "event_time")
// 1.4 定义分区的 Schema
val options = new CreateTableOptions
// 1.4.1 设置分区
options.setRangePartitionColumns(List("user_id").asJava)
// 1.4.2 设置副本
options.setNumReplicas(1)
// 1.4 创建表
if (!kuduContext.tableExists(tableName)) {
kuduContext.createTable(tableName, schema, primaryKey, options)
}
}
private def deleteTable(kuduContext: KuduContext): Unit = {
// 1.1 定义表名
val list: ListTablesResponse = kuduContext.syncClient.getTablesList
val scala: mutable.Buffer[String] = list.getTablesList.asScala
scala.foreach(t => {
if (kuduContext.tableExists(t)) {
kuduContext.deleteTable(t)
println(s"delete table -> $t")
}
})
}
}
dataFrame 操作 kudu
DML操作
Kudu 支持许多 DM L类型的操作,其中一些操作包含在 Spark on Kudu 集成. 包括:
- INSERT - 将 DataFrame 的行插入 Kudu 表。请注意,虽然 API 完全支持 INSERT,但不鼓励在 Spark 中使用它。 使用 INSERT 是有风险的,因为 Spark 任务可能需要重新执行,这意味着可能要求再次插入已插入的行。这样做会导致失败,因为如果行已经存在,INSERT 将不允许插入行(导致失败)。相反,我们鼓励使用下面描述 的 INSERT_IGNORE。
@scala.deprecated("Use KuduContext.insertRows(data, tableName, new KuduWriteOptions(ignoreDuplicateRowErrors = true))", "1.8.0")
- INSERT-IGNORE - 将 DataFrame 的行插入 Kudu 表。如果表存在,则忽略插入动作。
- DELETE - 从 Kudu 表中删除 DataFrame 中的行。
- UPSERT - 如果存在,则在 Kudu 表中更新 DataFrame 中的行,否则执行插入操作。
- UPDATE - 更新 Dataframe 中的行。
package com.bba.rtm.demo
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.{KuduContext, KuduWriteOptions}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConverters.seqAsJavaListConverter
object KuduSparkDataFrameDemo {
case class People(id: Int, name: String, age: Int)
val master1 = "cdpcluster-2.futuremove.cn:7051"
val kuduMasters: String = Seq(master1).mkString(",")
val tableName = "kudu_test_people"
def main(args: Array[String]): Unit = {
// SparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
// SparkSession
val sparkSession: SparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate
// SparkContext
val sparkContext: SparkContext = sparkSession.sparkContext
sparkContext.setLogLevel("WARN")
// KuduContext
val kuduContext: KuduContext = new KuduContext(kuduMasters, sparkContext)
// 创建表
createKuduTable(kuduContext, tableName)
// 插入数据
insertKuduTable(sparkSession, kuduContext, tableName)
showData(sparkSession, tableName)
// 如果存在就是更新,否则就是插入
upsertData(sparkSession, kuduContext, tableName)
showData(sparkSession, tableName)
// 如果存在就是更新,否则就是报错
// updateData(sparkSession, kuduContext, tableName)
// 删除数据
deleteData1(sparkSession, kuduContext, tableName)
showData(sparkSession, tableName)
// SparkSession stop
sparkSession.stop()
}
/**
* 创建表
*
* @param kuduContext KuduContext
* @param tableName String
*/
def createKuduTable(kuduContext: KuduContext, tableName: String): Unit = {
if (!kuduContext.tableExists(tableName)) {
// Schema (使用 StructType)
val schema: StructType = StructType(
StructField("id", IntegerType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("age", IntegerType, nullable = false) :: Nil)
// 主键
val tablePrimaryKey = List("id")
// CreateTableOptions
val options: CreateTableOptions = new CreateTableOptions()
.setRangePartitionColumns(List("id").asJava)
.setNumReplicas(1)
kuduContext.createTable(tableName, schema, tablePrimaryKey, options)
}
}
/**
* 插入数据
*
* @param sparkSession SparkSession
* @param kuduContext KuduContext
* @param tableName String
*/
def insertKuduTable(sparkSession: SparkSession,
kuduContext: KuduContext,
tableName: String): Unit = {
val data = List(
People(1, "程超", 20),
People(2, "韩冬", 35),
People(3, "高继原", 40)
)
val peopleRDD: RDD[People] = sparkSession.sparkContext.parallelize(data)
import sparkSession.implicits._
val peopleDF: DataFrame = peopleRDD.toDF()
// kuduContext.insertRows(peopleDF, tableName)
kuduContext.insertRows(peopleDF, tableName, new KuduWriteOptions(ignoreDuplicateRowErrors = true))
// kuduContext.insertIgnoreRows(peopleDF, tableName)
}
def showData(sparkSession: SparkSession,
tableName: String): Unit = {
val kuduOptions: Map[String, String] = Map(
"kudu.master" -> kuduMasters,
"kudu.table" -> tableName
)
sparkSession.read.options(kuduOptions).format("kudu").load().show(20)
}
/**
* 如果存在就是更新,否则就是插入
*
* @param sparkSession SparkSession
* @param kuduContext KuduContext
* @param tableName String
*/
def upsertData(sparkSession: SparkSession, kuduContext: KuduContext,
tableName: String): Unit = {
val data: List[People] = List(
People(1, "Cheng, Chao", 50), People(4, "李四", 40)
)
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.sparkContext
.parallelize(data)
.toDF
// 如果存在就是更新,否则就是插入
kuduContext.upsertRows(dataFrame, tableName)
}
/**
* 如果存在就是更新,否则就是报错
*
* @param sparkSession SparkSession
* @param kuduContext KuduContext
* @param tableName String
*/
def updateData(sparkSession: SparkSession, kuduContext: KuduContext,
tableName: String): Unit = {
val dataList: List[People] = List(
People(5, "程超", 49)
)
import sparkSession.implicits._
val data: DataFrame = sparkSession.sparkContext
.parallelize(dataList)
.toDF
// 如果存在就是更新,否则就是报错
kuduContext.updateRows(data, tableName)
}
def deleteData1(sparkSession: SparkSession, kuduContext: KuduContext, tableName: String): Unit = {
val kuduOptions: Map[String, String] = Map(
"kudu.master" -> kuduMasters,
"kudu.table" -> tableName
)
val dataFrame: DataFrame = sparkSession.read.options(kuduOptions)
.format("kudu")
.load()
dataFrame.createTempView("temp")
// 获取年龄等于 40 的所有用户的 ID
val result: DataFrame = sparkSession.sql(
"select id from temp where age = 40"
)
// result.show()
kuduContext.deleteRows(result, tableName)
}
}
运行结果
+---+-----------+---+
| id| name|age|
+---+-----------+---+
| 1|Cheng, Chao| 50|
| 2| 韩冬| 35|
| 3| 高继原| 40|
| 4| 李四| 40|
+---+-----------+---+
+---+-----------+---+
| id| name|age|
+---+-----------+---+
| 1|Cheng, Chao| 50|
| 2| 韩冬| 35|
| 3| 高继原| 40|
| 4| 李四| 40|
+---+-----------+---+
+---+-----------+---+
| id| name|age|
+---+-----------+---+
| 1|Cheng, Chao| 50|
| 2| 韩冬| 35|
+---+-----------+---+
DataFrameApi读取kudu表中的数据
虽然我们可以通过上面显示的 KuduContext
执行大量操作,但我们还可以直接从默认数据源本身调用读/写 API。要设置读取,我们需要为 Kudu 表指定选项,命名我们要读取的表以及为表提供服务的 Kudu 集群的 Kudu 主服务器列表。
def showData(sparkSession: SparkSession,
tableName: String): Unit = {
val kuduOptions: Map[String, String] = Map(
"kudu.master" -> kuduMasters,
"kudu.table" -> tableName
)
sparkSession.read.options(kuduOptions).format("kudu").load().show(20)
}
DataFrameApi写数据到kudu表中
在通过 DataFrame API 编写时,目前只支持一种模式 “append”。尚未实现的 “overwrite” 模式。
// overwrite
// append
def dataFrame2Kudu(sparkSession: SparkSession, tableName: String,
saveMode: String): Unit = {
val kuduOptions: Map[String, String] = Map(
"kudu.master" -> kuduMasters,
"kudu.table" -> tableName
)
val dataList: List[People] = List(
People(7, "Jim", 30),
People(8, "Joe Senders", 32)
)
import sparkSession.implicits._
val dataFrame:DataFrame = sparkSession.sparkContext
.parallelize(dataList)
.toDF
dataFrame.write.options(kuduOptions)
.mode("append")
.format("kudu")
.save
}
def main(args: Array[String]): Unit = {
// SparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
// SparkSession
val sparkSession: SparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate
// SparkContext
val sparkContext: SparkContext = sparkSession.sparkContext
sparkContext.setLogLevel("WARN")
// KuduContext
val kuduContext: KuduContext = new KuduContext(kuduMasters, sparkContext)
// 创建表
createKuduTable(kuduContext, tableName)
// 插入数据
insertKuduTable(sparkSession, kuduContext, tableName)
showData(sparkSession, tableName)
// 如果存在就是更新,否则就是插入
upsertData(sparkSession, kuduContext, tableName)
showData(sparkSession, tableName)
// 如果存在就是更新,否则就是报错
// updateData(sparkSession, kuduContext, tableName)
// 删除数据
deleteData1(sparkSession, kuduContext, tableName)
showData(sparkSession, tableName)
dataFrame2Kudu(sparkSession, tableName, "append")
showData(sparkSession, tableName)
// Currently, only Append is supported
dataFrame2Kudu(sparkSession, tableName, "overwrite")
showData(sparkSession, tableName)
// SparkSession stop
sparkSession.stop()
}
使用 SparkSql 操作 Kudu 表
可以选择使用 Spark SQL 直接使用 INSERT 语句写入 Kudu 表;与 append
类似 INSERT
语句实际上将默认使用 UPSERT
语义处理;
def sparkSql2Kudu(sparkSession: SparkSession, tableName: String): Unit = {
val kuduOptions: Map[String, String] = createKuduOptions(tableName)
val dataList: List[People] = List(
People(10, "曲强", 28),
People(11, "冯海东", 36)
)
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.sparkContext
.parallelize(dataList)
.toDF()
// 把 dataFrame 注册成一张表
dataFrame.createTempView("temp1")
// 获取 Kudu 中的数据, 然后注册成一张表
sparkSession.read
.options(kuduOptions)
.format("kudu")
.load()
.createTempView("temp2")
// 使用 SparkSQL 的 insert 操作
sparkSession.sql("insert into table temp2 select * from temp1")
sparkSession.sql("select * from temp2")
.show()
}
Kudu Native RDD
Spark 与 Kudu 的集成同时提供了 kudu RDD.
def kuduRDDDemo(sparkSession: SparkSession,
kuduContext: KuduContext,
tableName: String): Unit = {
val sparkContext: SparkContext = sparkSession.sparkContext
val kuduRDD: RDD[Row] = kuduContext.kuduRDD(sparkContext, tableName, Seq("name", "age"))
// 操作此 RDD
val result: RDD[(String, Int)] = kuduRDD.map {
case Row(name: String, age: Int) => (name, age)
}
result.foreach(println)
}
kudu 集成 impala
Impala 基本介绍
Impala 是 Cloudera 提供的一款高效率的 sql 查询工具,提供实时的查询效果,官方测试性能比 hive 快 10到 10 0倍,其 sql 查询比 sparkSQL 还要更加快速,号称是当前大数据领域最快的查询 sql 工具.
Impala 是参照谷歌的新三篇论文(Caffeine–网络搜索引擎、Pregel–分布式图计算、Dremel–交互式分析工具)当中的Dremel实现而来,其中旧三篇论文分别是(BigTable,GFS,MapReduce)分别对应 HBase 和 HDFS以及 MapReduce。
Impala 是基于 hive 并使用内存进行计算,兼顾数据仓库,具有实时,批处理,多并发等优点
Kudu 与 Apache Impala (孵化)紧密集成, Impala 天然就支持兼容 kudu,允许开发人员使用 Impala 的SQL 语法从 Kudu 的 tablets 插入,查询,更新和删除数据;
Impala 的架构以及查询计划
Impalad : 基本是每个DataNode上都会启动一个Impalad进程,Impalad主要扮演两个角色:
- Coordinator:
- 负责接收客户端发来的查询,解析查询,构建查询计划
- 把查询子任务分发给很多Executor,收集Executor返回的结果,组合后返回给客户端
- 对于客户端发送来的DDL,提交给Catalogd处理
- Executor:
- 执行查询子任务,将子任务结果返回给Coordinator
Catalogd : 整个集群只有一个Catalogd,负责所有元数据的更新和获取
StateStored :
- 整个集群只有一个Statestored,作为集群的订阅中心,负责集群不同组件的信息同步
- 跟踪集群中的Impalad的健康状态及位置信息,由statestored进程表示,它通过创建多个线程来处理Impalad的注册订阅和与各Impalad保持心跳连接,各Impalad都会缓存一份State Store中的信息,当State Store离线后(Impalad发现State Store处于离线时,会进入recovery模式,反复注册,当State Store重新加入集群后,自动恢复正常,更新缓存数据)因为Impalad有State Store的缓存仍然可以工作,但会因为有些Impalad失效了,而已缓存数据无法更新,导致把执行计划分配给了失效的Impalad,导致查询失败。
使用impala操作kudu整合
1、需要先启动 HDFS 、Hive、Kudu、Impala 2、使用 impala 的 shell 控制台执行命令 impala-shell
使用 impala-shell
命令启动 Impala Shell 。默认情况下,impala-shell 尝试连接到 localhost 端口 21000 上的 Impala 守护程序。要连接到其他主机,请使用该 -i <host:port>
选项。要自动连接到特定的 Impala 数据库,请使用该 -d <database>
选项。
例如,如果您的所有 Kudu 表都位于数据库中的 Impala 中 impala_kudu,则 -d impala_kudu
可以使用此数据库。
退出 Impala Shell 使用 exit;
或者 quit;
;
一个例子:
impala-shell -i cdpcluster-2.futuremove.cn:21000
Starting Impala Shell without Kerberos authentication
Opened TCP connection to cdpcluster-2.futuremove.cn:21000
Connected to cdpcluster-2.futuremove.cn:21000
Server version: impalad version 3.4.0-SNAPSHOT RELEASE (build 3b41fdeabdfda540f1fb45c429ce2878ee22f2b7)
***********************************************************************************
Welcome to the Impala shell.
(Impala Shell v3.4.0-SNAPSHOT (3b41fde) built on Tue Oct 6 01:57:52 UTC 2020)
To see live updates on a query's progress, run 'set LIVE_SUMMARY=1;'.
***********************************************************************************
[cdpcluster-2.futuremove.cn:21000] default>
[cdpcluster-2.futuremove.cn:21000] default> show tables;
Query: show tables
+----------+
| name |
+----------+
| employee |
+----------+
Fetched 1 row(s) in 0.01s
[cdpcluster-2.futuremove.cn:21000] default> describe employee ;
Query: describe employee
+---------+--------+---------+
| name | type | comment |
+---------+--------+---------+
| id | int | |
| name | string | |
| age | int | |
| address | string | |
| salary | bigint | |
+---------+--------+---------+
Fetched 5 row(s) in 0.01s
使用 Impala 创建新的 Kudu 表时,可以将该表创建为内部表或外部表。
内部表由 Impala 管理,当您从 Impala 中删除时,数据和表确实被删除。当您使用 Impala 创建新表时,它通常是内部表。
使用impala创建内部表
CREATE TABLE my_first_table (
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'cdpcluster-2.futuremove.cn:7051',
'kudu.table_name' = 'my_first_table'
);
ERROR: AnalysisException: Not allowed to set 'kudu.table_name' manually for synchronized Kudu tables.
## Again..
CREATE TABLE my_first_table (
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'cdpcluster-2.futuremove.cn:7051'
);
## output ...
Query: CREATE TABLE my_first_table (
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'cdpcluster-2.futuremove.cn:7051'
)
+-------------------------+
| summary |
+-------------------------+
| Table has been created. |
+-------------------------+
Fetched 1 row(s) in 1.70s
在 CREATE TABLE 语句中,必须首先列出构成主键的列。
此时创建的表是内部表,从impala删除表的时候,在底层存储的kudu也会删除表。
drop table if exists my_first_table;
外部表
外部表(创建者 CREATE EXTERNAL TABLE
)不受 Impala 管理,并且删除此表不会将表从其源位置(此处为 Kudu)丢弃。相反,它只会去除 Impala 和 Kudu 之间的映射。这是 Kudu 提供的用于将现有表映射到 Impala 的语法。
首先使用 java 创建一个 Kudu 表, 使用 Impala 创建外部表 , 将 Kudu 的表映射到 Impala 上:
在impala-shell执行
CREATE EXTERNAL TABLE `kudu_test_people` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'kudu_test_people',
'kudu.master_addresses' = 'cdpcluster-2.futuremove.cn:7051'
) ;
## output ...
Query: CREATE EXTERNAL TABLE `kudu_test_people` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'kudu_test_people',
'kudu.master_addresses' = 'cdpcluster-2.futuremove.cn:7051'
)
+-------------------------+
| summary |
+-------------------------+
| Table has been created. |
+-------------------------+
Fetched 1 row(s) in 0.05s
使用impala对kudu进行DML操作
Impala 允许使用标准 SQL 语句将数据插入 Kudu 。
CREATE TABLE my_first_table (
id BIGINT,
name STRING,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'cdpcluster-2.futuremove.cn:7051'
) ;
-- 插入单行记录
INSERT INTO my_first_table VALUES (50, "zhangsan");
-- 查看数据
select * from my_first_table ;
-- 使用单个语句插入三行
INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim");
-- 批量插入Batch Insert
-- 从 Impala 和 Kudu 的角度来看,
-- 通常表现最好的方法通常是使用 Impala 中的 SELECT FROM 语句导入数据
INSERT INTO my_first_table
SELECT * FROM temp1;
-- 更新数据
UPDATE my_first_table SET name="程超" WHERE id =1 ;
-- 删除数据
DELETE FROM my_first_table WHERE id =2;
更改表属性
开发人员可以通过更改表的属性来更改 Impala 与给定 Kudu 表相关的元数据。这些属性包括
- 表名
- Kudu 主地址列表
- 表是否由 Impala (内部)或外部管理。
-- Rename an Impala Mapping Table ( 重命名 Impala 映射表 )
ALTER TABLE my_first_table RENAME TO my_table;
-- Rename the underlying Kudu table for an internal table ( 重新命名内部表的基础 Kudu 表 )
-- 如果表是内部表,则可以通过更改 kudu.table_name 属性重命名底层的 Kudu 表
ALTER TABLE kudu_student SET TBLPROPERTIES('kudu.table_name' = 'new_student');
-- Remapping an external table to a different Kudu table ( 将外部表重新映射到不同的 Kudu 表 )
-- 如果用户在使用过程中发现其他应用程序重新命名了 kudu 表,那么此时的外部表需要重新映射到 kudu上
-- 创建一个外部表:
CREATE EXTERNAL TABLE external_table
STORED AS KUDU
TBLPROPERTIES (
'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051',
'kudu.table_name' = 'person'
);
-- 重新映射外部表,指向不同的kudu表:
ALTER TABLE external_table
SET TBLPROPERTIES('kudu.table_name' = 'hashTable')
-- 上面的操作是:将 external_table 映射的 PERSON 表重新指向 hashTable 表
-- Change the Kudu Master Address ( 更改 Kudu Master 地址 )
ALTER TABLE my_table
SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-new-master.example.com:7051');
-- Change an Internally-Managed Table to External ( 将内部管理的表更改为外部 )
ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');
Impala 使用 Java 语言操作 Kudu
对于 Impala 而言,开发人员是可以通过 JDBC 连接 Impala 的,有了 JDBC,开发人员可以通过 Impala 来间接操作 kudu;
将 Kudu 映射为 Impala 的表:
CREATE EXTERNAL TABLE `vin_test_2` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'vin_test_2',
'kudu.master_addresses' = 'cdpcluster-2.futuremove.cn:7051')
maven 的依赖
<!--spring 对 jdbc 支持(默认 HiKariCP 连接池) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>ImpalaJDBC41</artifactId>
<version>2.6.12.1013</version>
</dependency>
application.yml
server:
servlet:
context-path: /test-kudu1
port: 10087
spring:
config:
activate:
on-profile: dev_huafu
datasource:
username:
password:
url: 'jdbc:impala://cdpcluster-2.futuremove.cn:21050/default;auth=noSasl'
driver-class-name: 'com.cloudera.impala.jdbc41.Driver'
JdbcTemplateConfig
@Configuration
public class JdbcTemplateConfig {
@Autowired
private DataSource dataSource;
@Bean
public JdbcTemplate jdbcTemplate() {
return new JdbcTemplate(dataSource);
}
}
DAO Class
@Component
public class KuduDemo {
private static final Logger logger = LoggerFactory.getLogger(KuduDemo.class);
private static final String JDBC_DRIVER = "com.cloudera.impala.jdbc41.Driver";
private static final String CONNECTION_URL = "jdbc:impala://cdpcluster-2.futuremove.cn:21050/default;auth=noSasl";
@Autowired
private DataSource dataSource;
@Autowired
private JdbcTemplate jdbcTemplate;
public void testUseConnection(Consumer<Connection> cncs,
Consumer<Exception> excs) {
try {
Class.forName(JDBC_DRIVER);
try (Connection conn = DriverManager.getConnection(CONNECTION_URL)) {
logger.info("conn is closed ? -> {}", conn.isClosed());
cncs.accept(conn);
} catch (Exception e2) {
excs.accept(e2);
}
} catch (Exception e1) {
excs.accept(e1);
}
}
public long countByVin(String vin) {
String sql = "select count(*) from vin_test_2 where vin = ? ";
try {
List<Long> res = jdbcTemplate.queryForList(sql, Long.class, vin);
if (!res.isEmpty()) {
return res.get(0);
}
} catch (DataAccessException dae) {
dae.printStackTrace();
}
return 0L;
}
public long maxByMillage(String vin) {
String sql = "select max(mileage) from vin_test_2 where vin = ? ";
try {
List<Long> res = jdbcTemplate.queryForList(sql, Long.class, vin);
if ( !res.isEmpty()) {
return res.get(0);
}
} catch (DataAccessException dae) {
dae.printStackTrace();
}
return 0L;
}
}
Kudu 原理
表与schema
Kudu 设计是面向结构化存储的,因此,Kudu 的表需要用户在建表时定义它的 Schema 信息,这些 Schema 信息包含:列定义(含类型),Primary Key 定义(用户指定的若干个列的有序组合)。数据的唯一性,依赖于用户所提供的 Primary Key 中的 Column 组合的值的唯一性。
Kudu 提供了 Alter 命令来增删列,但位于 Primary Key 中的列是不允许删除的。 Kudu 当前并不支持二级索引。
从用户角度来看,Kudu 是一种存储结构化数据表的存储系统。在一个 Kudu 集群中可以定义任意数量的 table,每个 table 都需要预先定义好 schema。每个 table 的列数是确定的,每一列都需要有名字和类型,每个表中可以把其中一列或多列定义为主键。这么看来,Kudu 更像关系型数据库,而不是像 HBase、Cassandra和 MongoDB 这些 NoSQL 数据库。不过 Kudu 目前还不能像关系型数据一样支持二级索引。
Kudu 使用确定的列类型,而不是类似于 NoSQL 的 “everything is byte”。这可以带来两点好处:
- 确定的列类型使 Kudu 可以进行类型特有的编码。
- 可以提供 SQL-like 元数据给其他上层查询工具,比如BI工具。
kudu的底层数据模型
Kudu 的底层数据文件的存储,未采用 HDFS 这样的较高抽象层次的分布式文件系统,而是自行开发了一套可基于 Table/Tablet/Replica 视图级别的底层存储系统。
这套实现基于如下的几个设计目标:
- 可提供快速的列式查询
- 可支持快速的随机更新
- 可提供更为稳定的查询性能保障
一张表会分成若干个 tablet,每个 tablet 包括 MetaData 元信息及若干个 RowSet,RowSet 包含一个 MemRowSet 及若干个 DiskRowSet,DiskRowSet 中包含一个 BloomFile、Ad_hoc Index、BaseData、DeltaMem 及若干 RedoFile 和U ndoFile(UndoFile一般情况下只有一个)。
- MemRowSet: 用于新数据 insert 及已在 MemRowSet 中的数据的更新,一个 MemRowSet 写满后会将数据刷到磁盘形成若干个 DiskRowSet。每次到达 32M 生成一个 DiskRowSet。
- DiskRowSet: 用于老数据的变更(mutation),后台定期对 DiskRowSet 做 compaction,以删除没用的数据及合并历史数据,减少查询过程中的 IO 开销。
- BloomFile: 根据一个 DiskRowSet 中的 key 生成一个 bloom filter,用于快速模糊定位某个 key 是否在 DiskRowSet 中存在。
- Ad_hocIndex: 是主键的索引,用于定位 key 在 DiskRowSet 中的具体哪个偏移位置。
- BaseData: 是 MemRowSet flush 下来的数据,按列存储,按主键有序。
- UndoFile: 是基于 BaseData 之前时间的历史数据,通过在 BaseData上apply UndoFile中 的记录,可以获得历史数据。
- RedoFile: 是基于 BaseData 之后时间的变更(mutation)记录,通过在 BaseData 上 apply RedoFile 中的记录,可获得较新的数据。
- DeltaMem: 用于 DiskRowSet 中数据的变更 mutation,先写到内存中,写满后 flush 到磁盘形成 RedoFile。
- MemRowSets: 可以对比理解成 HBase 中的 MemStore, 而 DiskRowSets 可理解成 HBase 中的 HFile。MemRowSets 中的数据按照行试图进行存储,数据结构为 B-Tree。
- MemRowSets 中的数据被 Flush 到磁盘之后,形成 DiskRowSets。
- DisRowSets 中的数据,按照 32MB 大小为单位,按序划分为一个个的 DiskRowSet。 DiskRowSet中 的数据按照 Column 进行组织,与 Parquet 类似。
这是 Kudu 可支持一些分析性查询的基础。每一个 Column 的数据被存储在一个相邻的数据区域,而这个数据区域进一步被细分成一个个的小的 Page 单元,与 HBase File 中的 Block 类似,对每一个 Column Page 可采用一些 Encoding 算法,以及一些通用的 Compression 算法。 既然可对 Column Page 可采用 Encoding 以及 Compression 算法,那么,对单条记录的更改就会比较困难了。
前面提到了Kudu可支持单条记录级别的更新/删除,是如何做到的?
与 HBase 类似,也是通过增加一条新的记录来描述这次更新/删除操作的。DiskRowSet 是不可修改了,那么 KUDU 要如何应对数据的更新呢?在KUDU中,把 DiskRowSet 分为了两部分:base data、delta stores。base data 负责存储基础数据,delta stores 负责存储 base data 中的变更数据.
如上图所示,数据从 MemRowSet 刷到磁盘后就形成了一份 DiskRowSet(只包含 base data),每份 DiskRowSet 在内存中都会有一个对应的 DeltaMemStore,负责记录此 DiskRowSet 后续的数据变更(更新、删除)。DeltaMemStore 内部维护一个 B-树索引,映射到每个 row_offset 对应的数据变更。DeltaMemStore 数据增长到一定程度后转化成二进制文件存储到磁盘,形成一个 DeltaFile,随着 base data 对应数据的不断变更,DeltaFile 逐渐增长。
Tablet的发现过程
当创建 Kudu 客户端时,其会从主服务器上获取 tablet 位置信息,然后直接与服务于该 tablet 的服务器进行交谈。
为了优化读取和写入路径,客户端将保留该信息的本地缓存,以防止他们在每个请求时需要查询主机的 tablet 位置信息。
随着时间的推移,客户端的缓存可能会变得过时,并且当写入被发送到不再是 tablet 领导者的 tablet 服务器时,则将被拒绝。然后客户端将通过查询主服务器发现新领导者的位置来更新其缓存
kudu的写流程
如上图,当 Client 请求写数据时,先根据主键从 Master Server 中获取要访问的目标 Tablets,然后到依次对应的 Table 获取数据。
因为 KUDU 表存在主键约束,所以需要进行主键是否已经存在的判断,这里就涉及到之前说的索引结构对读写的优化了。一个 Tablet 中存在很多个 RowSets,为了提升性能,我们要尽可能地减少要扫描的RowSets数量。
首先,我们先通过每个 RowSet 中记录的主键的(最大最小)范围,过滤掉一批不存在目标主键的 RowSets,然后在根据RowSet中的布隆过滤器,过滤掉确定不存在目标主键的 RowSets,最后再通过 RowSets 中的 B-树索引,精确定位目标主键是否存在。
如果主键已经存在,则报错(主键重复),否则就进行写数据(写 MemRowSet)。
kudu的读流程
如上图,数据读取过程大致如下:先根据要扫描数据的主键范围,定位到目标的 Tablets,然后读取 Tablets 中的 RowSets。
在读取每个 RowSet 时,先根据主键过滤要 scan 范围,然后加载范围内的 base data,再找到对应的 delta stores,应用所有变更,最后 union 上 MemRowSet 中的内容,返回数据给 Client。
kudu的更新流程
数据更新的核心是定位到待更新数据的位置,这块与写入的时候类似,就不展开了,等定位到具体位置后,然后将变更写到对应的 delta store 中。
参考(照抄然后验证)
If you like TeXt, don’t forget to give me a star.