Spark - RDD使用详解10(HBase的读取与写入)
作者:hangge | 2023-11-07 08:23
十一、HBase 的读取与写入
1,准备工作
(1)首先我们需要在项目的 pom.xml 文件中添加 HBase 相关依赖项。
(2)接着我们编写一段代码测试对 HBase 的数据读写操作(与 Spark 无关):
<!-- hbase-client 依赖-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.5.5</version>
</dependency>
<!-- HBase MapReduce 模块的依赖-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.5.5</version>
</dependency>
(2)接着我们编写一段代码测试对 HBase 的数据读写操作(与 Spark 无关):
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Get, Admin,
TableDescriptorBuilder, ColumnFamilyDescriptorBuilder}
import org.apache.hadoop.hbase.util.Bytes
object Hello {
def main(args: Array[String]): Unit = {
// 配置 HBase 连接信息
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.60.9")
conf.set("hbase.zookeeper.property.clientPort", "2181")
// 建立 HBase 连接
val connection: Connection = ConnectionFactory.createConnection(conf)
val admin: Admin = connection.getAdmin
val tableName = TableName.valueOf("my_table")
val table = connection.getTable(tableName)
try {
// 自动创建表(如果不存在)
if (!admin.tableExists(tableName)) {
val tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build())
.build()
admin.createTable(tableDescriptor)
println("成功创建表 my_table。")
}
// 写入数据到 HBase
val put = new Put(Bytes.toBytes("row1"))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("hangge.com"))
table.put(put)
println("成功写入数据到 HBase。")
// 从 HBase 读取数据
val get = new Get(Bytes.toBytes("row1"))
val result = table.get(get)
val value = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("col1")))
println(s"从 HBase 读取数据: $value")
} finally {
// 关闭连接
table.close()
connection.close()
}
}
}
(3)程序运行后控制台输出的内容如下,说明 HBase 可以正常读写了:

2,将 RDD 的数据写入到 HBase 表中
(1)下面代码我们从一个包含记录的 RDD 构建 Put 对象,然后使用 saveAsNewAPIHadoopDataset 这个 Hadoop API 方法将这些 Put 对象写入到 HBase 表中。
import org.apache.hadoop.hbase.{HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)
// 配置 HBase 连接信息
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.60.9")
conf.set("hbase.zookeeper.property.clientPort", "2181")
// 生成作业
val jobConf = new JobConf(conf, this.getClass)
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "my_table")
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
// 构建记录
val dataRDD: RDD[String] = sc.makeRDD(Array("1,hangge,M,100", "2,lili,M,27", "3,liuyun,F,35"))
// 数据进行转换
val rdd: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map(_.split(',')).map {
arr: Array[String] => {
//行健的值
val put = new Put(Bytes.toBytes(arr(0)))
//name列的值
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
//gender列的值
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2)))
//age列的值
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt))
(new ImmutableBytesWritable, put)
}
}
// 使用 saveAsNewAPIHadoopDataset 方法将RDD数据写入到 HBase 表
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
println("成功将RDD数据写入到 HBase。")
//关闭 Spark
sc.stop()
}
}
(2)代码执行完毕后,可以看到数据以及成功添加到 HBase 数据库中:

3,读取 HBase 表中数据转换为RDD
(1)下面代码将前面存放在 HBase 表中的数据读取出来转换为 RDD,并进行转换,统计出各性别的人数。
注意第 12 行代码:自从 Spark 3.2 版本以来,默认情况下 spark.hadoopRDD.ignoreEmptySplits 被设置为 true,这意味着 Spark 将不会为空的输入分片创建空分区。如果想要恢复到 Spark 3.2 之前的行为,可以将 spark.hadoopRDD.ignoreEmptySplits 设置为 false。否则会发现 newAPIHadoopRDD 方法得到的 RDD 里面不包含任何数据(程序也不会报错)
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
.set("spark.hadoopRDD.ignoreEmptySplits", "false")
// 创建 Spark 上下文环境对象(连接对象)
val sc: SparkContext = new SparkContext(sparkConf)
// 配置 HBase 连接信息
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "192.168.60.9")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableInputFormat.INPUT_TABLE, "my_table")
// 读取 HBase 数据并将其转换为 RDD
val hbaseRDD = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
// 对 RDD 进行转换和统计
val genderCountRDD = hbaseRDD.map {
case (_, result) =>
val gender = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("gender")))
(gender, 1)
}.reduceByKey(_ + _)
// 打印结果
genderCountRDD.collect().foreach {
case (gender, count) => println(s"性别: $gender, 数量: $count")
}
//关闭 Spark
sc.stop()
}
}
(2)代码执行后控制台输出如下内容:
全部评论(0)