返回 导航

大数据

hangge.com

Elasticsearch、HBase - 仿百度搜索引擎项目案例实操2(案例代码、实现步骤)

作者:hangge | 2025-05-26 09:41
    我在之前的文章中介绍了整个仿百度搜索引擎的功能和架构(点击查看),本文开始这个搜索引擎项目的数据存储、建立索引、以及页面检索这三块功能的开发。大致的开发步骤如下:
  • 调用接口获取数据导入 HBaseRedis(存储 Rowkey)。
  • 通过 ESHBase 中的数据建立索引。
  • 对接 Web 项目,提供页面检索功能。
提示:由于这个搜索引擎项目主要演示如何进行数据的存储、索引、检索,因此就不包括具体的爬虫数据采集模块,而是直接使用一个本地的 json 文件作为数据来源。

一、准备工作

1,创建项目

这里我创建一个 SpringBoot 项目,然后编辑项目的 pom.xml 文件,添加需要的依赖:
<!-- SLF4J API,用于定义日志记录的标准接口,支持多种日志实现 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
</dependency>
<!-- SLF4J 的 Log4j 绑定,使 SLF4J API 能够使用 Log4j 作为具体日志实现 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- 阿里巴巴提供的高性能 JSON 序列化/反序列化库,用于处理 JSON 数据 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>
<!-- HBase 客户端库,用于连接和操作 Apache HBase 分布式数据库 -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.5.5</version>
</dependency>
<!-- Elasticsearch 的高级 REST 客户端库,用于与 Elasticsearch 集群交互 -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.26</version>
</dependency>
<!-- Redis 的 Java 客户端 Jedis,用于与 Redis 数据库交互 -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

2,准备原始数据

(1)为满足需求,文章数据需要包含如下 6 个字段:
  • 文章 ID:需要建立索引,并且存储,这是 ESID 字段必须要具备的特性。
  • 标题:因为查询的时候会用到,所以需要建立索引,并且在返回结果列表信息的时候需要直接从 ES 中返回,所以需要存储。
  • 作者:查询用不到,所以不需要建立索引,但是需要在返回结果列表信息的时候一块返回,所以需要存储。
  • 描述:查询会用到,返回的结果列表信息中也有这个字段内容,所以需要建立索引,并且存储。
  • 正文:因为查询的时候会用到,所以需要建立索引,但是在返回结果列表信息的时候不需要返回这个字段,所以不需要存储。其实还有一点很重要的原因是因为这个字段内容太长了,如果在 ES 中存储,会额外占用很多的存储空间,最终会影响 ES 的性能。
  • 时间:查询用不到,所以不需要建立索引,但是需要在返回结果列表信息的时候一块返回,所以需要存储。

(2)为方便演示,我这里直接在项目的 resources 文件夹下创建一个 news.json 文件存放原始数据。

(3)文件内容如下:
[
  {
    "id": "0e0cbf00-d63c-4c1f-aa2e-9f96a5826029",
    "title": "人工智能的未来",
    "author": "张三",
    "describe": "探讨人工智能在未来社会中的应用前景。",
    "content": "随着科技的进步,人工智能已经渗透到各个领域,包括医疗、教育、交通等。未来,人工智能将更深刻地改变我们的生活方式。",
    "time": "2024-12-24 10:00:00"
  },
  {
    "id": "0d5aeec8-3aa4-4fa5-b7eb-1ba0d0f2d715",
    "title": "大数据在商业中的应用",
    "author": "李四",
    "describe": "介绍大数据如何帮助企业决策。",
    "content": "通过分析消费者行为数据,企业可以更准确地预测市场需求,制定更有效的营销策略,从而提升竞争力。",
    "time": "2024-12-24 11:00:00"
  },
  {
    "id": "0cd0ece6-9d61-4fda-bbcd-a9d9f0d7ff70",
    "title": "区块链技术的现状与挑战",
    "author": "王五",
    "describe": "分析区块链技术的应用及其面临的挑战。",
    "content": "尽管区块链技术在金融和供应链领域取得了显著进展,但其扩展性和能源消耗问题仍需解决。",
    "time": "2024-12-24 12:00:00"
  },
  {
    "id": "0c2c9dca-b0db-42bc-aa6c-ad180501169d",
    "title": "新能源技术的发展趋势",
    "author": "赵六",
    "describe": "解析新能源技术对环保的积极影响。",
    "content": "太阳能、风能等新能源技术的迅猛发展,为应对全球气候变化提供了重要解决方案。这些技术将推动可持续发展。",
    "time": "2024-12-24 13:00:00"
  },
  {
    "id": "09e8bd66-17e0-4e28-b804-a67df7312f90",
    "title": "元宇宙的潜力与争议",
    "author": "钱七",
    "describe": "讨论元宇宙对人类社会的影响。",
    "content": "元宇宙作为虚拟世界的新形态,提供了丰富的交互体验。然而,过度沉迷可能带来社会和心理问题,这需要引起关注。",
    "time": "2024-12-24 14:00:00"
  },
  {
    "id": "08210f1a-f553-4d3c-a09f-420dc8e6fe8c",
    "title": "自动驾驶技术的应用场景",
    "author": "周九",
    "describe": "探讨自动驾驶技术的实际应用和未来方向。",
    "content": "自动驾驶技术已经在物流和乘用车领域取得了初步成果。未来,随着算法和硬件的优化,自动驾驶将成为日常生活的一部分。",
    "time": "2024-12-24 16:00:00"
  },
  {
    "id": "06d75712-1fac-4a19-927f-f4c13ceba899",
    "title": "中国航天的新成就",
    "author": "郑十一",
    "describe": "回顾中国航天近年来的重大突破。",
    "content": "中国航天在载人航天、探月工程和火星探测方面都取得了历史性成就,展示了强大的科技实力和创新能力。",
    "time": "2024-12-24 18:00:00"
  }
]

3,建立 Elasticsearch 索引库

(1)由于字段比较多,我们把 settingsmapping 信息写到一个文件中,使用起来比较方便。创建一个文件:article.json,内容如下:
(1)dynamic 参数有 4 个选项值:
  • true 是默认的,表示开启动态映射
  • false 表示忽略没有定义的字段,
  • strict 表示遇到未知字段时抛出异常
  • runtime 表示遇到未知字段时将它作为运行时字段,运行时字段是在 ES7.11 版本中增加的,运行时字段不会被索引,但是可以从 _source 中获取运行时字段内容,所以 runtime 可以适合公共字段已知,并且想兼容未知扩展字段的场景。
(2)dynamic 具体选择哪个参数,就需要根据需求来定了,在这里不希望在 ES 中保存未知字段,所以使用 strict
{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1
  },
  "mappings": {
    "dynamic": "strict",
    "_source": {
      "excludes": [
        "content"
      ]
    },
    "properties": {
      "title": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "author": {
        "type": "text",
        "index": false
      },
      "describe": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "content": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "time": {
        "type": "date",
        "index": false,
        "format": "yyyy-MM-dd HH:mm:ss"
      }
    }
  }
}

(2)然后将 article.json 上传到服务器的 Elasticsearch 目录下,然后执行如下命令创建索引库:article
curl -H "Content-Type: application/json" -XPUT 'http://192.168.121.128:9200/article' -d @article.json

(3)最后确认一下索引库 articlemapping 信息:
curl -XGET 'http://192.168.121.128:9200/article/_mapping?pretty'

4,建立 HBase 表

(1)执行如下命令启动 HBaseshell 命令行工具:
./bin/hbase shell

(2)然后执行如下命令在 HBase 中创建表:article
create 'article','info'

二、编写代码

1,实体类

该项目只有一个文章实体类 Article,具体代码如下:
**
 * 文章实体类
 *
 */
public class Article {
	
	private String id;
	
	private String title;
	
	private String describe;
	
	private String content;
	
	private String author;
	
	private String time;

	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getTitle() {
		return title;
	}
	public void setTitle(String title) {
		this.title = title;
	}

	public String getAuthor() {
		return author;
	}
	public void setAuthor(String author) {
		this.author = author;
	}
	public String getDescribe() {
		return describe;
	}
	public void setDescribe(String describe) {
		this.describe = describe;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
	public String getTime() {
		return time;
	}
	public void setTime(String time) {
		this.time = time;
	}
}

2,工具类

(1)首先我们创建一个操作 HBase 的工具类 HBaseUtil,具体代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * HBase工具类
 */
public class HBaseUtil {
    private HBaseUtil(){}

    private static Connection conn = getConn();

    private static Connection getConn(){
        //获取hbase链接
        Configuration conf = new Configuration();
        //指定hbase使用的zk地址
        //注意:需要在执行hbase hava代码的机器上配置zk和hbase集群的主机名和ip的映射关系
        conf.set("hbase.zookeeper.quorum","192.168.121.128");
        conf.set("hbase.zookeeper.property.clientPort","2181");
        //指定hbase在hdfs上的根目录
        //conf.set("hbase.rootdir","hdfs://192.168.121.128:9000/hbase");
        //创建HBase数据库链接
        Connection co = null;
        try{
            co = ConnectionFactory.createConnection(conf);
        }catch (IOException e){
            System.out.println("获取链接失败:"+e.getMessage());
        }
        return co;
    }

    /**
     * 对外提供的方法
     * @return
     */
    public static Connection getInstance(){
        return conn;
    }

    /**
     * 创建表
     * @param tableName
     * @param cfs
     */
    public static void createTable(String tableName,String... cfs) throws Exception {
        Admin admin = conn.getAdmin();
        ArrayList<ColumnFamilyDescriptor> cfArr = new ArrayList<ColumnFamilyDescriptor>();
        for (String cf : cfs) {
            ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder
                    .newBuilder(Bytes.toBytes(cf))
                    .build();
            cfArr.add(cfDesc);
        }
        TableDescriptor tableDesc = TableDescriptorBuilder
                .newBuilder(TableName.valueOf(tableName))
                .setColumnFamilies(cfArr)
                .build();
        admin.createTable(tableDesc);
        admin.close();
    }

    /**
     * 添加一个单元格(列)的数据
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @param value
     * @throws Exception
     */
    public static void put2HBaseCell(String tableName,String rowKey,String columnFamily,
                                     String column,String value) throws Exception {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
        table.put(put);
        table.close();
    }

    /**
     * 向hbase中添加一批数据
     * @param tableName
     * @param list
     * @throws Exception
     */
    public static void put2HBaseList(String tableName, List<Put> list) throws Exception{
        Table table = conn.getTable(TableName.valueOf(tableName));
        table.put(list);
        table.close();
    }

    /**
     * 根据Rowkey获取数据
     * @param tableName
     * @param rowKey
     * @return
     * @throws IOException
     */
    public static Map<String,String> getFromHBase(String tableName,String rowKey)throws IOException{
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        List<Cell> cells = result.listCells();
        HashMap<String, String> resMap = new HashMap<String, String>();
        for (Cell cell: cells) {
            //列
            byte[] column_bytes = CellUtil.cloneQualifier(cell);
            //值
            byte[] value_bytes = CellUtil.cloneValue(cell);
            resMap.put(new String(column_bytes),new String(value_bytes));
        }
        return resMap;
    }
}

(2)接着我们创建一个操作 Redis 的工具类 RedisUtil,具体代码如下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * 基于Redis连接池提取Redis工具类
 */
public class RedisUtil {
    //私有化构造函数,禁止new
    private RedisUtil(){}

    private static JedisPool jedisPool = null;

    //获取连接
    public static synchronized Jedis getJedis(){
        if(jedisPool==null){
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            poolConfig.setMaxIdle(10);
            poolConfig.setMaxTotal(100);
            poolConfig.setMaxWaitMillis(2000);
            poolConfig.setTestOnBorrow(true);
            jedisPool = new JedisPool(poolConfig, "192.168.121.128", 6379,1000, "123");
        }
        return jedisPool.getResource();
    }

    //向连接池返回连接
    public static void returnResource(Jedis jedis){
        jedis.close();
    }
}

(3)最后,我们创建一个操作 Elasticsearch 的工具类 EsUtil,具体代码如下:
import com.example.demo.Article;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

/**
 * ES工具类
 */
public class EsUtil {
    private EsUtil(){}
    private static RestHighLevelClient client;
    static{
        //获取RestClient连接
        //注意:高级别客户端其实是对低级别客户端代码进行封装,所以连接池用的是低级别客户端中的连接池
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.121.128",9200,"http"),
                        new HttpHost("192.168.121.129",9200,"http"),
                        new HttpHost("192.168.121.130",9200,"http"))
                        .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                            public HttpAsyncClientBuilder customizeHttpClient(
                                    HttpAsyncClientBuilder httpClientBuilder) {
                                return httpClientBuilder.setDefaultIOReactorConfig(
                                        IOReactorConfig.custom()
                                                //设置线程池中线程的数量,默认是1个,
                                                //建议设置为和客户端机器可用CPU数量一致
                                                .setIoThreadCount(1)
                                                .build());
                            }
                        }));
    }

    /**
     * 获取客户端
     * @return
     */
    public static RestHighLevelClient getRestClient(){
        return client;
    }

    /**
     * 关闭客户端
     * 注意:调用高级别客户端close方法时,会将低级别客户端创建的连接池整个关闭,导致client无法继续使用
     * 所以正常是用不到这个close方法的,只有在程序结束的时候才需要调用
     * @throws IOException
     */
    public static void closeRestClient()throws IOException {
        client.close();
    }

    /**
     * 建立索引
     * @param index
     * @param id
     * @param map
     * @throws IOException
     */
    public static void addIndex(String index, String id, Map<String,String> map)
            throws IOException{
        IndexRequest request = new IndexRequest(index);
        request.id(id);
        request.source(map);
        //执行
        client.index(request, RequestOptions.DEFAULT);
    }

    /**
     * 全文检索功能
     * @param key
     * @param index
     * @param start
     * @param row
     * @return
     * @throws IOException
     */
    public static Map<String, Object> search(String key, String index, int start, int row)
            throws IOException {
        SearchRequest searchRequest = new SearchRequest();
        //指定索引库,支持指定一个或者多个,也支持通配符
        searchRequest.indices(index);

        //指定searchType
        searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH);

        //组装查询条件
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //如果传递了搜索参数,则拼接查询条件
        if(StringUtils.isNotBlank(key)){
            searchSourceBuilder.query(QueryBuilders.multiMatchQuery(key,"title","describe","content"));
        }
        //分页
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(row);

        //高亮
        //设置高亮字段
        HighlightBuilder highlightBuilder =  new HighlightBuilder()
                .field("title")
                .field("describe");//支持多个高亮字段
        //设置高亮字段的前缀和后缀内容
        highlightBuilder.preTags("<font color='red'>");
        highlightBuilder.postTags("</font>");
        searchSourceBuilder.highlighter(highlightBuilder);

        //指定查询条件
        searchRequest.source(searchSourceBuilder);

        //执行查询操作
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        //存储返回给页面的数据
        Map<String, Object> map = new HashMap<String, Object>();
        //获取查询返回的结果
        SearchHits hits = searchResponse.getHits();
        //获取数据总量
        long numHits = hits.getTotalHits().value;
        map.put("count",numHits);
        ArrayList<Article> arrayList = new ArrayList<>();
        //获取具体内容
        SearchHit[] searchHits = hits.getHits();
        //迭代解析具体内容
        for (SearchHit hit: searchHits) {
            Map<String, Object> sourceAsMap = hit.getSourceAsMap();
            String id = hit.getId();
            String title = sourceAsMap.get("title").toString();
            String author = sourceAsMap.get("author").toString();
            String describe = sourceAsMap.get("describe").toString();
            String time = sourceAsMap.get("time").toString();

            //获取高亮字段内容
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            //获取title字段的高亮内容
            HighlightField highlightField = highlightFields.get("title");
            if(highlightField!=null){
                Text[] fragments = highlightField.getFragments();
                title = "";
                for (Text text : fragments) {
                    title += text;
                }
            }
            //获取describe字段的高亮内容
            HighlightField highlightField2 = highlightFields.get("describe");
            if(highlightField2!=null){
                Text[] fragments = highlightField2.fragments();
                describe = "";
                for (Text text : fragments) {
                    describe += text;
                }
            }
            //把文章信息封装到Article对象中
            Article article = new Article();
            article.setId(id);
            article.setTitle(title);
            article.setAuthor(author);
            article.setDescribe(describe);
            article.setTime(time);
            //最后再把拼装好的article添加到list对象汇总
            arrayList.add(article);
        }
        map.put("dataList",arrayList);
        return map;
    }
}

3,核心类

(1)首先是数据导入类 DataImport,执行时会从资源文件夹下读取 news.json 数据文件,解析数据并入库 HBaseRedisRowkey),具体代码如下:
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.example.demo.utils.HBaseUtil;
import com.example.demo.utils.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
 * 从资源文件夹下读取 news.json 数据文件,入库 HBase 和 Redis(Rowkey)
 * 注意:HBase 建表语句 create 'article','info'
 */
public class DataImport {
    private final static Logger logger = LoggerFactory.getLogger(DataImport.class);

    public static void main(String[] args) {
        // 从 resources 文件夹读取 news.json 文件
        try (InputStream inputStream = DataImport.class.getClassLoader().
                getResourceAsStream("news.json");
             Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8.name())) {

            // 读取文件内容为字符串
            StringBuilder jsonContent = new StringBuilder();
            while (scanner.hasNextLine()) {
                jsonContent.append(scanner.nextLine());
            }

            // 将文件内容解析为 JSON 对象
            JSONArray resArr = JSONObject.parseArray(jsonContent.toString());
            for (int i = 0; i < resArr.size(); i++) {
                JSONObject jsonObj = resArr.getJSONObject(i);

                // 文章 ID 作为 HBase 的 Rowkey 和 ES 的 ID
                String id = jsonObj.getString("id");
                String title = jsonObj.getString("title");
                String author = jsonObj.getString("author");
                String describe = jsonObj.getString("describe");
                String content = jsonObj.getString("content");
                String time = jsonObj.getString("time");

                Jedis jedis = null;
                try {
                    // 将数据入库到 HBase
                    String tableName = "article";
                    String cf = "info";
                    HBaseUtil.put2HBaseCell(tableName, id, cf, "title", title);
                    HBaseUtil.put2HBaseCell(tableName, id, cf, "author", author);
                    HBaseUtil.put2HBaseCell(tableName, id, cf, "describe", describe);
                    HBaseUtil.put2HBaseCell(tableName, id, cf, "content", content);
                    HBaseUtil.put2HBaseCell(tableName, id, cf, "time", time);

                    // 将 Rowkey 保存到 Redis 中
                    jedis = RedisUtil.getJedis();
                    jedis.lpush("l_article_ids", id);
                } catch (Exception e) {
                    // 注意:由于HBase的put操作属于幂等操作,多次操作对最终结果没影响,无需额外处理
                    logger.error("数据添加失败:" + e.getMessage());
                } finally {
                    // 向连接池返回连接
                    if (jedis != null) {
                        RedisUtil.returnResource(jedis);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("读取 news.json 文件失败:" + e.getMessage());
        }
    }
}

(2)接着时数据索引初始化类,执行时会在 Elasticsearch 中对 HBase 中的数据建立索引,具体代码如下:
import com.example.demo.utils.EsUtil;
import com.example.demo.utils.HBaseUtil;
import com.example.demo.utils.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.Map;

/**
 * 在ES中对HBase中的数据建立索引
 */
public class DataIndex {
    private final static Logger logger = LoggerFactory.getLogger(DataIndex.class);
    public static void main(String[] args) {
        List<String> rowKeyList = null;
        Jedis jedis = null;
        try {
            //1:首先从Redis的列表中获取Rowkey
            jedis = RedisUtil.getJedis();
            //brpop如果获取到了数据,返回的list里面有两列,第一列是key的名称,第二列是具体的数据
            rowKeyList = jedis.brpop(3, "l_article_ids");
            while (rowKeyList != null) {
                String rowKey = rowKeyList.get(1);
                //2:根据Rowkey到HBase中获取数据的详细信息
                Map<String, String> map = HBaseUtil.getFromHBase("article", rowKey);
                //3:在ES中对数据建立索引
                EsUtil.addIndex("article",rowKey,map);

                //循环从Redis的列表中获取Rowkey
                rowKeyList = jedis.brpop(3, "l_article_ids");
            }
        }catch (Exception e){
            logger.error("数据建立索引失败:"+e.getMessage());
            //在这里可以考虑把获取出来的rowKey再push到Redis中,这样可以保证数据不丢
            if(rowKeyList!=null){
                jedis.rpush("l_article_ids",rowKeyList.get(1));
            }
        }finally {
            //向连接池返回连接
            if(jedis!=null){
                RedisUtil.returnResource(jedis);
            }
            //注意:确认ES连接不再使用了再关闭连接,否则会导致client无法继续使用
            try{
                EsUtil.closeRestClient();
            }catch (Exception e){
                logger.error("ES连接关闭失败:"+e.getMessage());
            }
        }
    }
}

4,数据检索接口

这里我创建一个名为 ArticleControllerController,对外提供两个 API 接口:搜索文章和查看文章详情。具体代码如下:
import com.example.demo.Article;
import com.example.demo.utils.HBaseUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;
import com.example.demo.utils.EsUtil;

@Controller
@RequestMapping("article")
public class ArticleController {
    static final Logger logger = LoggerFactory.getLogger(ArticleController.class);

    /**
     * 搜索
     * @param modelMap
     * @param skey
     * @param request
     * @param start
     * @param row
     * @return
     */
    @ResponseBody
    @RequestMapping(value="/search")
    public Map<String,Object> serachArticle(
                                @RequestParam(value="skey",required = false) String skey,
                                HttpServletRequest request,
                                @RequestParam(value = "start", defaultValue = "0") Integer start,
                                @RequestParam(value = "row", defaultValue = "10") Integer row){
        Map<String,Object> map = new HashMap<String, Object>();
        Long count = 0L;
        try {
            map = EsUtil.search(skey,"article",start, row);
            count = (Long)map.get("count");
        } catch (Exception e) {
            logger.error("查询索引错误!{}",e);
            e.printStackTrace();
        }
        map.put("start", start);
        map.put("row", row);
        return map;
    }

    /**
     * 查看文章详细信息
     * @return
     */
    @ResponseBody
    @RequestMapping("/detailArticleById/{id}")
    public Article detailArticleById(@PathVariable(value="id") String id){
        try{
            Map<String, String> map = HBaseUtil.getFromHBase("article", id);
            Article article = new Article();
            article.setId(id);
            article.setTitle(map.get("title"));
            article.setAuthor(map.get("author"));
            article.setDescribe(map.get("describe"));
            article.setContent(map.get("content"));
            article.setTime(map.get("time"));
            return article;
        }catch (Exception e){
            logger.error("HBase数据查询异常:"+e.getMessage());
        }
        return null;
    }
}

三、运行测试

1,数据入库

(1)首先执行项目中的 DataImport 代码,将数据导入到 HBaseRedis
注意DataImport 代码执行需要消耗一段时间。

(2)然后到 HBase 中验证数据,可以看到数据都完整保存进来了:
./bin/hbase shell
count 'article'
scan 'article'

(3)接着到 Redis 中验证数据,可以看到所有的文章 id 也都保存下来了。
redis-cli -a 123
lrange l_article_ids 0 -1

2,建立索引

(1)执行项目中的 DataIndex 代码,在 ES 中建立索引。执行完毕后到 ES 中验证数据:
curl -XGET 'http://192.168.121.128:9200/article/_search?pretty'

(2)可以看到返回结构如下,这里我们需要重点关注 _source 字段中是否包含 content 字段,如果包含此字段内容说明前面的 mapping 配置有问题,如果不包含此字段内容说明是正确的。

3,检索查询

(1)启动项目,我们首先测试搜索功能是否正常,关键字是否高亮。可以看到一共查到了 3 条数据,按条件返回最前面的两条数据。
提示:由于搜索数据是直接从 Elasticsearch 中返回,所以不包含正文数据。
  • 再测试下翻页是否正常:

(2)接着测试详情查看接口,根据文章 IDHBase 返回文章的全部数据,包括正文内容。

附:单索引库查询效率降低的问题

1,问题描述

    爬虫程序每天都会到互联网上采集新的文章数据,如果项目运行了半年、1 年,所有的数据都存储到 Elasticsearch 的一个索引库里面,这样会导致查询效率降低。

2,解决办法

(1)可以考虑按周或者按月创建索引库,通过索引库别名关联最近半年内的索引库,实现默认查询最近半年内的数据。
  • 索引库的命名可以按照一定的规律,假设是按月建立索引库,则索引库的名称大致是这样的:
article_202401
article_202402
article_202403
......

(2)如果确实需要查询历史以来所有的数据,在查询的时候可以通过索引库通配符实现所有数据查询,使用这个索引库通配符即可:article_*,这样可以查询所有以 article_ 开头的索引库。
评论

全部评论(0)

回到顶部