返回 导航

大数据

hangge.com

HBase - 数据批量导入教程2(使用BulkLoad)

作者:hangge | 2024-11-23 08:45
    我在之前的文章中介绍了如何使用 MapReduce 实现数据批量导入到 HBase点击查看),本文接着介绍另一种方法:利用 Bulkload。该方法首先使用 MapReduce 直接生成 HFile 文件,然后再通过 BulkloadHFile 文件直接加载到表中。

二、使用 BulkLoad 实现数据批量导入 HBase

1,准备工作

(1)首先我们要准备输入的数据文件 import.dat,其内容如下:
注意:字段分隔符为制表符,建议在服务器上使用 vi 命令创建编辑。
a       c1      name    hangge
a       c1      age     88
b       c1      name    xiaoliu
b       c1      age     19
c       c1      name    lili
c       c1      age     33

(2)然后将该文件传到 HDFS 中:
 hdfs dfs -put import.dat /

(3)接着我们在 HBase 中创建需要的目标表 batch1
create 'batch2','c1'

2,生成 HFile 文件

(1)首先我们需要创建一个 MapReduce 任务来生成 HFile 文件,该项目的 pom.xml 文件中需要添加 hadoop-clienthbase-clienthbase-mapreduce 的依赖:
注意hbase-clienthbase-mapreduce 不能设置 provided,这两个依赖需要打进 jar 包里面,否则会提示找不到对应的类。
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.2.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-mapreduce</artifactId>
    <version>2.3.0</version>
</dependency>

(2)并且 pom.xml 文件中还需要添加 Maven 的编译打包插件配置:
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(3)接着我们编写一个 MapReduce 任务 BatchImportBulkLoad,其代码如下,map 阶段,把数据封装成 put 操作,将数据生成 HBase 的底层存储文件 HFile
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 利用 BulkLoad 批量导入
 * 在 map 阶段,把数据封装成 put 操作,将数据生成 HBase 的底层存储文件 HFile
 */
public class BatchImportBulkLoad {
    public static class BulkLoadMapper extends Mapper<LongWritable, Text, 
            ImmutableBytesWritable, Put>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] strs = value.toString().split("\t");
            if(strs.length==4){
                String rowkey = strs[0];
                String columnFamily = strs[1];
                String name = strs[2];
                String val = strs[3];
                ImmutableBytesWritable rowkeyWritable 
                        = new ImmutableBytesWritable(rowkey.getBytes());
                Put put = new Put(rowkey.getBytes());
                put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes());
                context.write(rowkeyWritable,put);
            }
        }
    }

    public static void main(String[] args) throws Exception{
        if(args.length!=3){
            //如果传递的参数不够,程序直接退出
            System.exit(100);
        }

        String inPath = args[0];
        String outPath = args[1];
        String outTableName = args[2];

        //设置属性对应参数
        Configuration conf = new Configuration();
        conf.set("hbase.table.name",outTableName);
        conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");

        //封装 Job
        Job job = Job.getInstance(conf, "Batch Import HBase Table:" + outTableName);
        job.setJarByClass(BatchImportBulkLoad.class);

        //指定输入路径
        FileInputFormat.setInputPaths(job,new Path(inPath));

        //指定输出路径[如果输出路径存在,就将其删除]
        FileSystem fs = FileSystem.get(conf);
        Path output = new Path(outPath);
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job, output);

        //指定 map 相关的代码
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //禁用 Reduce
        job.setNumReduceTasks(0);

        Connection connection = ConnectionFactory.createConnection(conf);
        TableName tableName = TableName.valueOf(outTableName);
        HFileOutputFormat2.configureIncrementalLoad(job,connection.getTable(tableName),
                connection.getRegionLocator(tableName));

        job.waitForCompletion(true);
    }
}

(4)代码编写完毕后,执行打 Jar 包的操作:

(5)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

(6)我们将前面生成的 Jar 包上传至 Hadoop 集群的任意一台机器上,或者 Hadoop 客户端机器上,并且执行如下命令向集群提交任务:
hadoop jar hadoop-0.0.1-SNAPSHOT-jar-with-dependencies.jar BatchImportBulkLoad hdfs://node1:9000/import.dat hdfs://node1:9000/hbase_out batch2

(7)执行成功之后,可以看到 HDFS 上的 HFile 文件已经成功生成:

3,加载 HFile 文件

(1)在 HBase 客户端节点上执行下面命令,把 HFile 数据转移到表对应的 region 中。
hbase org.apache.hadoop.hbase.tool.BulkLoadHFilesTool hdfs://node1:9000/hbase_out batch2

(2)执行成功之后,查询 HBasebatch2 表中的结果,可以看到数据已经导入成功。
scan 'batch2'
评论

全部评论(0)

回到顶部