HBase - 数据批量导入教程1(使用MapReduce)
作者:hangge | 2024-11-22 08:38
要实现数据批量导入到 HBase 通常有两种方法:利用 MapReduce 和利用 Bulkload。本文首先介绍前者,即利用 MapReduce 中封装好的方法。在 map 阶段,把数据封装成 Put 操作,直接将数据入库。
(2)然后将该文件传到 HDFS 中:
一、使用 MapReduce 实现数据批量导入 HBase
1,准备工作
(1)首先我们要准备输入的数据文件 import.dat,其内容如下:
注意:字段分隔符为制表符,建议在服务器上使用 vi 命令创建编辑。
a c1 name hangge a c1 age 88 b c1 name xiaoliu b c1 age 19 c c1 name lili c c1 age 33
(2)然后将该文件传到 HDFS 中:
hdfs dfs -put import.dat /
(3)接着我们在 HBase 中创建需要的目标表 batch1:
(2)并且 pom.xml 文件中还需要添加 Maven 的编译打包插件配置:
(2)执行成功之后,查询 HBase 中 batch1 表中的结果,可以看到数据已经导入成功。
create 'batch1','c1'
2,项目配置
(1)项目的 pom.xml 文件中需要添加 hadoop-client、hbase-client 和 hbase-mapreduce 的依赖:
注意:hbase-client 和 hbase-mapreduce 不能设置 provided,这两个依赖需要打进 jar 包里面,否则会提示找不到对应的类。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.3.0</version>
</dependency>
(2)并且 pom.xml 文件中还需要添加 Maven 的编译打包插件配置:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3,编写代码
我们编写一个 MapReduce 任务 BatchImportMR,其代码如下,在 map 阶段,把数据封装成 Put 操作,直接将数据入库。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* 批量导入:
* 利用 MapReduce 中封装好的方法。
* 在 map 阶段,把数据封装成 Put 操作,直接将数据入库
*
* 注意:需要提前创建表 batch1
* create 'batch1','c1'
*/
public class BatchImportMR {
public static class BatchImportMapper extends Mapper<LongWritable, Text, NullWritable, Put>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] strs = value.toString().split("\t");
if(strs.length==4){
String rowkey = strs[0];
String columnFamily = strs[1];
String name = strs[2];
String val = strs[3];
Put put = new Put(rowkey.getBytes());
put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes());
context.write(NullWritable.get(),put);
}
}
}
public static void main(String[] args) throws Exception{
if(args.length!=2){
//如果传递的参数不够,程序直接退出
System.exit(100);
}
String inPath = args[0];
String outTableName = args[1];
//设置属性对应参数
Configuration conf = new Configuration();
conf.set("hbase.table.name",outTableName);
conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
//封装 Job
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName);
job.setJarByClass(BatchImportMR.class);
//指定输入路径
FileInputFormat.setInputPaths(job,new Path(inPath));
//指定 map 相关的代码
job.setMapperClass(BatchImportMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Put.class);
TableMapReduceUtil.initTableReducerJob(outTableName,null,job);
TableMapReduceUtil.addDependencyJars(job);
//禁用 Reduce
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
4,对 MapReduce 任务打 Jar 包
(1)代码编写完毕后,执行打 Jar 包的操作:
(2)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

5,向集群提交 MapReduce 任务
(1)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交任务:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar BatchImportMR hdfs://node1:9000/import.dat batch1
(2)执行成功之后,查询 HBase 中 batch1 表中的结果,可以看到数据已经导入成功。
scan 'batch1'

全部评论(0)