Spark - SparkSQL使用详解6(HBase的读取与写入)
作者:hangge | 2023-11-29 09:06
我在之前的文章中介绍了如何读取 HBase 表中数据转换为 RDD,以及如何将 RDD 中的数据保存到 HBase 数据库中(点击查看)。本文接着介绍如何通过 SparkSQL 来实现读写 HBase 中的数据。
(2)使用 create 命令创建一个新表(这里表名称为 people,列簇名为 cf)
(3)接着执行如下命令插入 3 条数据(每条数据均有三列):
(4)使用 scan 'people' 查看表数据,确保添加成功:
(2)然后执行如下命令进行编译:
(3)编译完毕后,我们需要的两个 jar 包会保存到项目目录的 spark/hbase-spark/target/ 和 spark/hbase-spark-protocol-shaded/target/ 文件夹下:



(2)除了上面那种读取 HBase 数据的方式外,我们还有另一种方式:针对 people 表创建一个目录(catalog),然后将其用于读取数据:


(2)但当我们开启后运行程序有会发现一旦遇到过滤操作时,会发现程序报如下错误。这是由于 HBase 服务器缺少相关的 jar 包。
(4)然后重启 HBase 服务,再次执行任务应用可以发现就能成功过滤并查询到数据,不会报错了。
六、HBase 的读取与写入
1,准备测试数据
(1)首先我们启动 HBase 的 shell 命令行工具:
./bin/hbase shell
(2)使用 create 命令创建一个新表(这里表名称为 people,列簇名为 cf)
create 'people', 'cf'
(3)接着执行如下命令插入 3 条数据(每条数据均有三列):
put 'people', 1, 'cf:name', 'hangge' put 'people', 1, 'cf:gender', 'M' put 'people', 1, 'cf:age', 100 put 'people', 2, 'cf:name', 'lili' put 'people', 2, 'cf:gender', 'F' put 'people', 2, 'cf:age', 27 put 'people', 3, 'cf:name', 'liuyun' put 'people', 3, 'cf:gender', 'M' put 'people', 3, 'cf:age', 35
(4)使用 scan 'people' 查看表数据,确保添加成功:

2,准备 HBase 连接器
(1)由于我使用的 Spark 版本时 3.X 的,而 Maven 上面的 HBase 连接器(Apache Spark HBase connector)只支持 Spark 2.X 版本,因此我们需要下载源码自行编译。首先执行如下命令将 Apache HBase Connectors 项目检出到本地:
git clone https://github.com/apache/hbase-connectors.git
(2)然后执行如下命令进行编译:
注意:mvn 命令中的 spark、scala、hbase、hadoop 版本参数根据实际情况进行修改,必须与实际使用的一致。
mvn -Dspark.version=3.3.1 -Dscala.version=2.12.15 -Dscala.binary.version=2.12 -Dhbase.version=2.4.15 -Dhadoop-three.version=3.3.2 -DskipTests clean package

(4)打开我们的 Spark 应用项目,在 src 同级的目录下新建一个 lib 目录,然后将这个 jar 包放在 lib 目录下:

3,添加依赖
编辑我们 Spark 应用项目的 pom.xml 文件,添加 Spark、HBase、Hadoop、Scala、HBase Spark 连接器依赖。下面是完整的配置代码:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 项目元数据 -->
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>MySpark</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 配置属性 -->
<properties>
<!-- 指定编译器版本 -->
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<!-- Spark 版本 -->
<spark.version>3.3.1</spark.version>
<!-- Scala 版本 -->
<scala.version>2.12</scala.version>
<!-- HBase 版本 -->
<hbase.version>2.5.5</hbase.version>
</properties>
<!-- 仓库配置 -->
<repositories>
<!-- 配置 Spring 仓库 -->
<repository>
<id>spring</id>
<url>https://maven.aliyun.com/repository/spring</url>
</repository>
</repositories>
<!-- 项目依赖 -->
<dependencies>
<!-- Spark 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- HBase 依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- Hadoop 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.2</version>
</dependency>
<!-- Scala 依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.15</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parser-combinators_2.12</artifactId>
<version>1.0.4</version>
</dependency>
<!-- HBase Spark 连接器依赖 -->
<dependency>
<groupId>org.apache.hbase.connectors.spark</groupId>
<artifactId>hbase-spark</artifactId>
<version>1.0.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/hbase-spark-1.0.1-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.hbase.connectors.spark</groupId>
<artifactId>hbase-spark-protocol-shaded</artifactId>
<version>1.0.1</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT.jar</systemPath>
</dependency>
</dependencies>
<!-- 构建配置 -->
<build>
<plugins>
<!-- Spring Boot Maven 插件配置 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 包括系统范围的依赖 -->
<includeSystemScope>true</includeSystemScope>
</configuration>
</plugin>
</plugins>
</build>
</project>
4,读取 HBase 数据
(1)下面代码我们使用 Spark 读取 HBase 表中的数据,并将其转换为 DataFrame,然后就可以通过 Spark SQL 语法或者 DSL 语法对数据进行分析统计了。
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf}
import org.apache.hadoop.hbase.{HBaseConfiguration}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建 HBase 配置对象
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
// 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
new HBaseContext(spark.sparkContext, conf)
// 指定要操作的 HBase 表名
val hbase_table = "people"
// 定义 HBase 列映射关系
val hbase_column_mapping = "id STRING :key, " +
"name STRING cf:name, " +
"gender STRING cf:gender, " +
"age STRING cf:age"
// 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
val hbaseDF = spark.read
.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", hbase_column_mapping)
.option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
.option("hbase.table", hbase_table)
.load()
// 打印 DataFrame 的模式信息
hbaseDF.printSchema()
// 显示DataFrame内容
hbaseDF.show()
// 使用DSL语法:统计男性和女性的人数
val genderCountsDF = hbaseDF.groupBy("gender").count()
println("性别统计:")
genderCountsDF.show()
// 使用SQL语法:查询年龄大于30的人数
hbaseDF.createOrReplaceTempView("people") // 首先要注册Dataframe为临时视图
val result2 = spark.sql("SELECT COUNT(*) FROM people WHERE age > 30")
println("年龄大于30岁的人员:")
result2.show()
//关闭 Spark
spark.stop()
}
}

import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建 HBase 配置对象
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
// 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
val hbaseContext = new HBaseContext(spark.sparkContext, conf)
// 定义目录
val catalog = s"""{
|"table":{"namespace":"default", "name":"people"},
|"rowkey":"key",
|"columns":{
|"id":{"col":"key", "type":"string"},
|"name":{"cf":"cf", "col":"name", "type":"string"},
|"gender":{"cf":"cf", "col":"gender", "type":"string"},
|"age":{"cf":"cf", "col":"age", "type":"string"}
|}
|}""".stripMargin
// 使目录读取 HBase 表中的数据,转换为 DataFrame
val hbaseDF = spark.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
.load()
// 打印 DataFrame 的模式信息
hbaseDF.printSchema()
// 显示DataFrame内容
hbaseDF.show()
// 使用DSL语法:统计男性和女性的人数
val genderCountsDF = hbaseDF.groupBy("gender").count()
println("性别统计:")
genderCountsDF.show()
// 使用SQL语法:查询年龄大于30的人数
hbaseDF.createOrReplaceTempView("people") // 首先要注册Dataframe为临时视图
val result2 = spark.sql("SELECT COUNT(*) FROM people WHERE age > 30")
println("年龄大于30岁的人员:")
result2.show()
//关闭 Spark
spark.stop()
}
}

5,保存数据到 HBase
(1)下面代码我们首先读取 people 表的数据,并将性别值改成中文后保存更新到库中。然后统计出各性别的人数后,保存到 gender_count 表中。
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.sql.functions.when
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建 HBase 配置对象
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
// 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
new HBaseContext(spark.sparkContext, conf)
// 指定要操作的 HBase 表名
val people_table = "people"
// 定义 HBase 列映射关系
val people_column_mapping = "id STRING :key, " +
"name STRING cf:name, " +
"gender STRING cf:gender, " +
"age STRING cf:age"
// 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
val hbaseDF = spark.read
.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", people_column_mapping)
.option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
.option("hbase.table", people_table)
.load()
// 将性别值改成中文并保存到库中
val hbaseDFWithChineseGender = hbaseDF.withColumn("gender",
when($"gender" === "M", "男性").otherwise("女性")
)
hbaseDFWithChineseGender.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", people_column_mapping)
.option("hbase.table", people_table)
.save()
// 定义性别统计表的列映射关系
val gender_count_column_mapping = "gender STRING :key, " +
"count INT cf:count"
// 统计男性和女性的人数
val genderCountsDF = hbaseDFWithChineseGender.groupBy("gender").count()
// 将性别统计结果存储到另一张表中,使用HBase列映射关系
genderCountsDF.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", gender_count_column_mapping)
.option("hbase.table", "gender_count")
.save()
//关闭 Spark
spark.stop()
}
}
(2)同样的,保存数据到 HBase 中除了上面那种方式外,还有另一种方式:针对 people、gender_count 表分别创建相应的目录(catalog),然后将其用于保存数据:

import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.functions.when
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建 HBase 配置对象
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.60.9:2181")
// 创建 HBaseContext 对象,用于在 Spark 中操作 HBase
new HBaseContext(spark.sparkContext, conf)
// 定义people表的目录
val people_catalog = s"""{
|"table":{"namespace":"default", "name":"people"},
|"rowkey":"key",
|"columns":{
|"id":{"col":"key", "type":"string"},
|"name":{"cf":"cf", "col":"name", "type":"string"},
|"gender":{"cf":"cf", "col":"gender", "type":"string"},
|"age":{"cf":"cf", "col":"age", "type":"string"}
|}
|}""".stripMargin
// 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
val hbaseDF = spark.read
.options(Map(HBaseTableCatalog.tableCatalog -> people_catalog))
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.pushdown.columnfilter", false) // 禁用列过滤下推优化
.load()
// 将性别值改成中文并保存到库中
val hbaseDFWithChineseGender = hbaseDF.withColumn("gender",
when($"gender" === "M", "男性").otherwise("女性")
)
hbaseDFWithChineseGender.write
.options(Map(HBaseTableCatalog.tableCatalog -> people_catalog))
.format("org.apache.hadoop.hbase.spark")
.save()
// 定义gender_count表的目录
val gender_count_catalog = s"""{
|"table":{"namespace":"default", "name":"gender_count"},
|"rowkey":"key",
|"columns":{
|"gender":{"col":"key", "type":"string"},
|"count":{"cf":"cf", "col":"count", "type":"int"}
|}
|}""".stripMargin
// 统计男性和女性的人数
val genderCountsDF = hbaseDFWithChineseGender.groupBy("gender").count()
genderCountsDF.show()
// 将性别统计结果存储到另一张表中,使用HBase列映射关系
genderCountsDF.write
.options(Map(HBaseTableCatalog.tableCatalog -> gender_count_catalog))
.format("org.apache.hadoop.hbase.spark")
.save()
//关闭 Spark
spark.stop()
}
}

附:启用过滤器下推优化
1,什么是下推优化?
Hbase 提供了种类丰富的过滤器(filter)来提高数据处理的效率,用户可以通过内置或自定义的过滤器来对数据进行过滤,所有的过滤器都在服务端生效,这样可以保证过滤掉的数据不会被传送到客户端,从而减轻网络传输和客户端处理的压力。

2,如何开启下推优化
(1)在上面读取 HBase 数据的实例中,我们是禁用下推优化的,也就是说 HBase 数据其实是全部传输到客户端,然后再在客户端上进行过滤。如果需要启用下推优化,只需要将 hbase.spark.pushdown.columnfilter 设置为 true,或者直接取消这个设置(默认即为 true)
// 使用 Spark 读取 HBase 表中的数据,根据列映射关系转换为 DataFrame
val hbaseDF = spark.read
.format("org.apache.hadoop.hbase.spark")
.option("hbase.columns.mapping", hbase_column_mapping)
.option("hbase.spark.pushdown.columnfilter", true) // 开启过滤下推优化
.option("hbase.table", hbase_table)
.load()
(2)但当我们开启后运行程序有会发现一旦遇到过滤操作时,会发现程序报如下错误。这是由于 HBase 服务器缺少相关的 jar 包。
org.apache.hadoop.hbase.DoNotRetryIOException: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1612)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:1157)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.newRegionScanner(RSRpcServices.java:3039)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:3369)
at org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:42278)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:133)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:338)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:318)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.spark.SparkSQLPushDownFilter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at org.apache.hadoop.hbase.util.DynamicClassLoader.tryRefreshClass(DynamicClassLoader.java:188)
at org.apache.hadoop.hbase.util.DynamicClassLoader.loadClass(DynamicClassLoader.java:151)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1603)
... 8 more
(3)要解决这个问题,我们只需要将前面编译的 HBase 连接器 jar 包(hbase-spark-xxx.jar、hbase-spark-protocol-shaded-xxx.jar)以及 scala-library-xxx.jar 包,复制到 HBase 安装目录下的 lib 文件夹中。
注意:scala-library 的 jar 包可以从本地 Maven 缓存中提取,比如在我的系统中就是 /Users/hangge/.m2/repository/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar
(4)然后重启 HBase 服务,再次执行任务应用可以发现就能成功过滤并查询到数据,不会报错了。
全部评论(0)