Elasticsearch、HBase - 仿百度搜索引擎项目案例实操2(案例代码、实现步骤)
作者:hangge | 2025-05-26 09:41
我在之前的文章中介绍了整个仿百度搜索引擎的功能和架构(点击查看),本文开始这个搜索引擎项目的数据存储、建立索引、以及页面检索这三块功能的开发。大致的开发步骤如下:


(2)然后将 article.json 上传到服务器的 Elasticsearch 目录下,然后执行如下命令创建索引库:article。
(3)最后确认一下索引库 article 的 mapping 信息:
(2)然后执行如下命令在 HBase 中创建表:article。
(2)接着我们创建一个操作 Redis 的工具类 RedisUtil,具体代码如下:
(3)最后,我们创建一个操作 Elasticsearch 的工具类 EsUtil,具体代码如下:
(2)接着时数据索引初始化类,执行时会在 Elasticsearch 中对 HBase 中的数据建立索引,具体代码如下:
(2)然后到 HBase 中验证数据,可以看到数据都完整保存进来了:
(2)可以看到返回结构如下,这里我们需要重点关注 _source 字段中是否包含 content 字段,如果包含此字段内容说明前面的 mapping 配置有问题,如果不包含此字段内容说明是正确的。

(2)如果确实需要查询历史以来所有的数据,在查询的时候可以通过索引库通配符实现所有数据查询,使用这个索引库通配符即可:article_*,这样可以查询所有以 article_ 开头的索引库。
- 调用接口获取数据导入 HBase 和 Redis(存储 Rowkey)。
- 通过 ES 对 HBase 中的数据建立索引。
- 对接 Web 项目,提供页面检索功能。
提示:由于这个搜索引擎项目主要演示如何进行数据的存储、索引、检索,因此就不包括具体的爬虫数据采集模块,而是直接使用一个本地的 json 文件作为数据来源。
一、准备工作
1,创建项目
这里我创建一个 SpringBoot 项目,然后编辑项目的 pom.xml 文件,添加需要的依赖:
<!-- SLF4J API,用于定义日志记录的标准接口,支持多种日志实现 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- SLF4J 的 Log4j 绑定,使 SLF4J API 能够使用 Log4j 作为具体日志实现 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- 阿里巴巴提供的高性能 JSON 序列化/反序列化库,用于处理 JSON 数据 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!-- HBase 客户端库,用于连接和操作 Apache HBase 分布式数据库 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.5.5</version>
</dependency>
<!-- Elasticsearch 的高级 REST 客户端库,用于与 Elasticsearch 集群交互 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.26</version>
</dependency>
<!-- Redis 的 Java 客户端 Jedis,用于与 Redis 数据库交互 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
2,准备原始数据
(1)为满足需求,文章数据需要包含如下 6 个字段:
- 文章 ID:需要建立索引,并且存储,这是 ES 中 ID 字段必须要具备的特性。
- 标题:因为查询的时候会用到,所以需要建立索引,并且在返回结果列表信息的时候需要直接从 ES 中返回,所以需要存储。
- 作者:查询用不到,所以不需要建立索引,但是需要在返回结果列表信息的时候一块返回,所以需要存储。
- 描述:查询会用到,返回的结果列表信息中也有这个字段内容,所以需要建立索引,并且存储。
- 正文:因为查询的时候会用到,所以需要建立索引,但是在返回结果列表信息的时候不需要返回这个字段,所以不需要存储。其实还有一点很重要的原因是因为这个字段内容太长了,如果在 ES 中存储,会额外占用很多的存储空间,最终会影响 ES 的性能。
- 时间:查询用不到,所以不需要建立索引,但是需要在返回结果列表信息的时候一块返回,所以需要存储。

(2)为方便演示,我这里直接在项目的 resources 文件夹下创建一个 news.json 文件存放原始数据。

(3)文件内容如下:
[
{
"id": "0e0cbf00-d63c-4c1f-aa2e-9f96a5826029",
"title": "人工智能的未来",
"author": "张三",
"describe": "探讨人工智能在未来社会中的应用前景。",
"content": "随着科技的进步,人工智能已经渗透到各个领域,包括医疗、教育、交通等。未来,人工智能将更深刻地改变我们的生活方式。",
"time": "2024-12-24 10:00:00"
},
{
"id": "0d5aeec8-3aa4-4fa5-b7eb-1ba0d0f2d715",
"title": "大数据在商业中的应用",
"author": "李四",
"describe": "介绍大数据如何帮助企业决策。",
"content": "通过分析消费者行为数据,企业可以更准确地预测市场需求,制定更有效的营销策略,从而提升竞争力。",
"time": "2024-12-24 11:00:00"
},
{
"id": "0cd0ece6-9d61-4fda-bbcd-a9d9f0d7ff70",
"title": "区块链技术的现状与挑战",
"author": "王五",
"describe": "分析区块链技术的应用及其面临的挑战。",
"content": "尽管区块链技术在金融和供应链领域取得了显著进展,但其扩展性和能源消耗问题仍需解决。",
"time": "2024-12-24 12:00:00"
},
{
"id": "0c2c9dca-b0db-42bc-aa6c-ad180501169d",
"title": "新能源技术的发展趋势",
"author": "赵六",
"describe": "解析新能源技术对环保的积极影响。",
"content": "太阳能、风能等新能源技术的迅猛发展,为应对全球气候变化提供了重要解决方案。这些技术将推动可持续发展。",
"time": "2024-12-24 13:00:00"
},
{
"id": "09e8bd66-17e0-4e28-b804-a67df7312f90",
"title": "元宇宙的潜力与争议",
"author": "钱七",
"describe": "讨论元宇宙对人类社会的影响。",
"content": "元宇宙作为虚拟世界的新形态,提供了丰富的交互体验。然而,过度沉迷可能带来社会和心理问题,这需要引起关注。",
"time": "2024-12-24 14:00:00"
},
{
"id": "08210f1a-f553-4d3c-a09f-420dc8e6fe8c",
"title": "自动驾驶技术的应用场景",
"author": "周九",
"describe": "探讨自动驾驶技术的实际应用和未来方向。",
"content": "自动驾驶技术已经在物流和乘用车领域取得了初步成果。未来,随着算法和硬件的优化,自动驾驶将成为日常生活的一部分。",
"time": "2024-12-24 16:00:00"
},
{
"id": "06d75712-1fac-4a19-927f-f4c13ceba899",
"title": "中国航天的新成就",
"author": "郑十一",
"describe": "回顾中国航天近年来的重大突破。",
"content": "中国航天在载人航天、探月工程和火星探测方面都取得了历史性成就,展示了强大的科技实力和创新能力。",
"time": "2024-12-24 18:00:00"
}
]
3,建立 Elasticsearch 索引库
(1)由于字段比较多,我们把 settings 和 mapping 信息写到一个文件中,使用起来比较方便。创建一个文件:article.json,内容如下:
(1)dynamic 参数有 4 个选项值:
- true 是默认的,表示开启动态映射
- false 表示忽略没有定义的字段,
- strict 表示遇到未知字段时抛出异常
- runtime 表示遇到未知字段时将它作为运行时字段,运行时字段是在 ES7.11 版本中增加的,运行时字段不会被索引,但是可以从 _source 中获取运行时字段内容,所以 runtime 可以适合公共字段已知,并且想兼容未知扩展字段的场景。
(2)dynamic 具体选择哪个参数,就需要根据需求来定了,在这里不希望在 ES 中保存未知字段,所以使用 strict。
{
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
},
"mappings": {
"dynamic": "strict",
"_source": {
"excludes": [
"content"
]
},
"properties": {
"title": {
"type": "text",
"analyzer": "ik_max_word"
},
"author": {
"type": "text",
"index": false
},
"describe": {
"type": "text",
"analyzer": "ik_max_word"
},
"content": {
"type": "text",
"analyzer": "ik_max_word"
},
"time": {
"type": "date",
"index": false,
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
}
(2)然后将 article.json 上传到服务器的 Elasticsearch 目录下,然后执行如下命令创建索引库:article。
curl -H "Content-Type: application/json" -XPUT 'http://192.168.121.128:9200/article' -d @article.json
(3)最后确认一下索引库 article 的 mapping 信息:
curl -XGET 'http://192.168.121.128:9200/article/_mapping?pretty'
4,建立 HBase 表
(1)执行如下命令启动 HBase 的 shell 命令行工具:
./bin/hbase shell
(2)然后执行如下命令在 HBase 中创建表:article。
create 'article','info'
二、编写代码
1,实体类
该项目只有一个文章实体类 Article,具体代码如下:
**
* 文章实体类
*
*/
public class Article {
private String id;
private String title;
private String describe;
private String content;
private String author;
private String time;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public String getDescribe() {
return describe;
}
public void setDescribe(String describe) {
this.describe = describe;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
2,工具类
(1)首先我们创建一个操作 HBase 的工具类 HBaseUtil,具体代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HBase工具类
*/
public class HBaseUtil {
private HBaseUtil(){}
private static Connection conn = getConn();
private static Connection getConn(){
//获取hbase链接
Configuration conf = new Configuration();
//指定hbase使用的zk地址
//注意:需要在执行hbase hava代码的机器上配置zk和hbase集群的主机名和ip的映射关系
conf.set("hbase.zookeeper.quorum","192.168.121.128");
conf.set("hbase.zookeeper.property.clientPort","2181");
//指定hbase在hdfs上的根目录
//conf.set("hbase.rootdir","hdfs://192.168.121.128:9000/hbase");
//创建HBase数据库链接
Connection co = null;
try{
co = ConnectionFactory.createConnection(conf);
}catch (IOException e){
System.out.println("获取链接失败:"+e.getMessage());
}
return co;
}
/**
* 对外提供的方法
* @return
*/
public static Connection getInstance(){
return conn;
}
/**
* 创建表
* @param tableName
* @param cfs
*/
public static void createTable(String tableName,String... cfs) throws Exception {
Admin admin = conn.getAdmin();
ArrayList<ColumnFamilyDescriptor> cfArr = new ArrayList<ColumnFamilyDescriptor>();
for (String cf : cfs) {
ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(cf))
.build();
cfArr.add(cfDesc);
}
TableDescriptor tableDesc = TableDescriptorBuilder
.newBuilder(TableName.valueOf(tableName))
.setColumnFamilies(cfArr)
.build();
admin.createTable(tableDesc);
admin.close();
}
/**
* 添加一个单元格(列)的数据
* @param tableName
* @param rowKey
* @param columnFamily
* @param column
* @param value
* @throws Exception
*/
public static void put2HBaseCell(String tableName,String rowKey,String columnFamily,
String column,String value) throws Exception {
Table table = conn.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
table.put(put);
table.close();
}
/**
* 向hbase中添加一批数据
* @param tableName
* @param list
* @throws Exception
*/
public static void put2HBaseList(String tableName, List<Put> list) throws Exception{
Table table = conn.getTable(TableName.valueOf(tableName));
table.put(list);
table.close();
}
/**
* 根据Rowkey获取数据
* @param tableName
* @param rowKey
* @return
* @throws IOException
*/
public static Map<String,String> getFromHBase(String tableName,String rowKey)throws IOException{
Table table = conn.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
List<Cell> cells = result.listCells();
HashMap<String, String> resMap = new HashMap<String, String>();
for (Cell cell: cells) {
//列
byte[] column_bytes = CellUtil.cloneQualifier(cell);
//值
byte[] value_bytes = CellUtil.cloneValue(cell);
resMap.put(new String(column_bytes),new String(value_bytes));
}
return resMap;
}
}
(2)接着我们创建一个操作 Redis 的工具类 RedisUtil,具体代码如下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* 基于Redis连接池提取Redis工具类
*/
public class RedisUtil {
//私有化构造函数,禁止new
private RedisUtil(){}
private static JedisPool jedisPool = null;
//获取连接
public static synchronized Jedis getJedis(){
if(jedisPool==null){
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxIdle(10);
poolConfig.setMaxTotal(100);
poolConfig.setMaxWaitMillis(2000);
poolConfig.setTestOnBorrow(true);
jedisPool = new JedisPool(poolConfig, "192.168.121.128", 6379,1000, "123");
}
return jedisPool.getResource();
}
//向连接池返回连接
public static void returnResource(Jedis jedis){
jedis.close();
}
}
(3)最后,我们创建一个操作 Elasticsearch 的工具类 EsUtil,具体代码如下:
import com.example.demo.Article;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
/**
* ES工具类
*/
public class EsUtil {
private EsUtil(){}
private static RestHighLevelClient client;
static{
//获取RestClient连接
//注意:高级别客户端其实是对低级别客户端代码进行封装,所以连接池用的是低级别客户端中的连接池
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.121.128",9200,"http"),
new HttpHost("192.168.121.129",9200,"http"),
new HttpHost("192.168.121.130",9200,"http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultIOReactorConfig(
IOReactorConfig.custom()
//设置线程池中线程的数量,默认是1个,
//建议设置为和客户端机器可用CPU数量一致
.setIoThreadCount(1)
.build());
}
}));
}
/**
* 获取客户端
* @return
*/
public static RestHighLevelClient getRestClient(){
return client;
}
/**
* 关闭客户端
* 注意:调用高级别客户端close方法时,会将低级别客户端创建的连接池整个关闭,导致client无法继续使用
* 所以正常是用不到这个close方法的,只有在程序结束的时候才需要调用
* @throws IOException
*/
public static void closeRestClient()throws IOException {
client.close();
}
/**
* 建立索引
* @param index
* @param id
* @param map
* @throws IOException
*/
public static void addIndex(String index, String id, Map<String,String> map)
throws IOException{
IndexRequest request = new IndexRequest(index);
request.id(id);
request.source(map);
//执行
client.index(request, RequestOptions.DEFAULT);
}
/**
* 全文检索功能
* @param key
* @param index
* @param start
* @param row
* @return
* @throws IOException
*/
public static Map<String, Object> search(String key, String index, int start, int row)
throws IOException {
SearchRequest searchRequest = new SearchRequest();
//指定索引库,支持指定一个或者多个,也支持通配符
searchRequest.indices(index);
//指定searchType
searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH);
//组装查询条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//如果传递了搜索参数,则拼接查询条件
if(StringUtils.isNotBlank(key)){
searchSourceBuilder.query(QueryBuilders.multiMatchQuery(key,"title","describe","content"));
}
//分页
searchSourceBuilder.from(start);
searchSourceBuilder.size(row);
//高亮
//设置高亮字段
HighlightBuilder highlightBuilder = new HighlightBuilder()
.field("title")
.field("describe");//支持多个高亮字段
//设置高亮字段的前缀和后缀内容
highlightBuilder.preTags("<font color='red'>");
highlightBuilder.postTags("</font>");
searchSourceBuilder.highlighter(highlightBuilder);
//指定查询条件
searchRequest.source(searchSourceBuilder);
//执行查询操作
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
//存储返回给页面的数据
Map<String, Object> map = new HashMap<String, Object>();
//获取查询返回的结果
SearchHits hits = searchResponse.getHits();
//获取数据总量
long numHits = hits.getTotalHits().value;
map.put("count",numHits);
ArrayList<Article> arrayList = new ArrayList<>();
//获取具体内容
SearchHit[] searchHits = hits.getHits();
//迭代解析具体内容
for (SearchHit hit: searchHits) {
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
String id = hit.getId();
String title = sourceAsMap.get("title").toString();
String author = sourceAsMap.get("author").toString();
String describe = sourceAsMap.get("describe").toString();
String time = sourceAsMap.get("time").toString();
//获取高亮字段内容
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
//获取title字段的高亮内容
HighlightField highlightField = highlightFields.get("title");
if(highlightField!=null){
Text[] fragments = highlightField.getFragments();
title = "";
for (Text text : fragments) {
title += text;
}
}
//获取describe字段的高亮内容
HighlightField highlightField2 = highlightFields.get("describe");
if(highlightField2!=null){
Text[] fragments = highlightField2.fragments();
describe = "";
for (Text text : fragments) {
describe += text;
}
}
//把文章信息封装到Article对象中
Article article = new Article();
article.setId(id);
article.setTitle(title);
article.setAuthor(author);
article.setDescribe(describe);
article.setTime(time);
//最后再把拼装好的article添加到list对象汇总
arrayList.add(article);
}
map.put("dataList",arrayList);
return map;
}
}
3,核心类
(1)首先是数据导入类 DataImport,执行时会从资源文件夹下读取 news.json 数据文件,解析数据并入库 HBase 和 Redis(Rowkey),具体代码如下:
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.example.demo.utils.HBaseUtil;
import com.example.demo.utils.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
/**
* 从资源文件夹下读取 news.json 数据文件,入库 HBase 和 Redis(Rowkey)
* 注意:HBase 建表语句 create 'article','info'
*/
public class DataImport {
private final static Logger logger = LoggerFactory.getLogger(DataImport.class);
public static void main(String[] args) {
// 从 resources 文件夹读取 news.json 文件
try (InputStream inputStream = DataImport.class.getClassLoader().
getResourceAsStream("news.json");
Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8.name())) {
// 读取文件内容为字符串
StringBuilder jsonContent = new StringBuilder();
while (scanner.hasNextLine()) {
jsonContent.append(scanner.nextLine());
}
// 将文件内容解析为 JSON 对象
JSONArray resArr = JSONObject.parseArray(jsonContent.toString());
for (int i = 0; i < resArr.size(); i++) {
JSONObject jsonObj = resArr.getJSONObject(i);
// 文章 ID 作为 HBase 的 Rowkey 和 ES 的 ID
String id = jsonObj.getString("id");
String title = jsonObj.getString("title");
String author = jsonObj.getString("author");
String describe = jsonObj.getString("describe");
String content = jsonObj.getString("content");
String time = jsonObj.getString("time");
Jedis jedis = null;
try {
// 将数据入库到 HBase
String tableName = "article";
String cf = "info";
HBaseUtil.put2HBaseCell(tableName, id, cf, "title", title);
HBaseUtil.put2HBaseCell(tableName, id, cf, "author", author);
HBaseUtil.put2HBaseCell(tableName, id, cf, "describe", describe);
HBaseUtil.put2HBaseCell(tableName, id, cf, "content", content);
HBaseUtil.put2HBaseCell(tableName, id, cf, "time", time);
// 将 Rowkey 保存到 Redis 中
jedis = RedisUtil.getJedis();
jedis.lpush("l_article_ids", id);
} catch (Exception e) {
// 注意:由于HBase的put操作属于幂等操作,多次操作对最终结果没影响,无需额外处理
logger.error("数据添加失败:" + e.getMessage());
} finally {
// 向连接池返回连接
if (jedis != null) {
RedisUtil.returnResource(jedis);
}
}
}
} catch (Exception e) {
logger.error("读取 news.json 文件失败:" + e.getMessage());
}
}
}
(2)接着时数据索引初始化类,执行时会在 Elasticsearch 中对 HBase 中的数据建立索引,具体代码如下:
import com.example.demo.utils.EsUtil;
import com.example.demo.utils.HBaseUtil;
import com.example.demo.utils.RedisUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import java.util.List;
import java.util.Map;
/**
* 在ES中对HBase中的数据建立索引
*/
public class DataIndex {
private final static Logger logger = LoggerFactory.getLogger(DataIndex.class);
public static void main(String[] args) {
List<String> rowKeyList = null;
Jedis jedis = null;
try {
//1:首先从Redis的列表中获取Rowkey
jedis = RedisUtil.getJedis();
//brpop如果获取到了数据,返回的list里面有两列,第一列是key的名称,第二列是具体的数据
rowKeyList = jedis.brpop(3, "l_article_ids");
while (rowKeyList != null) {
String rowKey = rowKeyList.get(1);
//2:根据Rowkey到HBase中获取数据的详细信息
Map<String, String> map = HBaseUtil.getFromHBase("article", rowKey);
//3:在ES中对数据建立索引
EsUtil.addIndex("article",rowKey,map);
//循环从Redis的列表中获取Rowkey
rowKeyList = jedis.brpop(3, "l_article_ids");
}
}catch (Exception e){
logger.error("数据建立索引失败:"+e.getMessage());
//在这里可以考虑把获取出来的rowKey再push到Redis中,这样可以保证数据不丢
if(rowKeyList!=null){
jedis.rpush("l_article_ids",rowKeyList.get(1));
}
}finally {
//向连接池返回连接
if(jedis!=null){
RedisUtil.returnResource(jedis);
}
//注意:确认ES连接不再使用了再关闭连接,否则会导致client无法继续使用
try{
EsUtil.closeRestClient();
}catch (Exception e){
logger.error("ES连接关闭失败:"+e.getMessage());
}
}
}
}
4,数据检索接口
这里我创建一个名为 ArticleController 的 Controller,对外提供两个 API 接口:搜索文章和查看文章详情。具体代码如下:
import com.example.demo.Article;
import com.example.demo.utils.HBaseUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;
import com.example.demo.utils.EsUtil;
@Controller
@RequestMapping("article")
public class ArticleController {
static final Logger logger = LoggerFactory.getLogger(ArticleController.class);
/**
* 搜索
* @param modelMap
* @param skey
* @param request
* @param start
* @param row
* @return
*/
@ResponseBody
@RequestMapping(value="/search")
public Map<String,Object> serachArticle(
@RequestParam(value="skey",required = false) String skey,
HttpServletRequest request,
@RequestParam(value = "start", defaultValue = "0") Integer start,
@RequestParam(value = "row", defaultValue = "10") Integer row){
Map<String,Object> map = new HashMap<String, Object>();
Long count = 0L;
try {
map = EsUtil.search(skey,"article",start, row);
count = (Long)map.get("count");
} catch (Exception e) {
logger.error("查询索引错误!{}",e);
e.printStackTrace();
}
map.put("start", start);
map.put("row", row);
return map;
}
/**
* 查看文章详细信息
* @return
*/
@ResponseBody
@RequestMapping("/detailArticleById/{id}")
public Article detailArticleById(@PathVariable(value="id") String id){
try{
Map<String, String> map = HBaseUtil.getFromHBase("article", id);
Article article = new Article();
article.setId(id);
article.setTitle(map.get("title"));
article.setAuthor(map.get("author"));
article.setDescribe(map.get("describe"));
article.setContent(map.get("content"));
article.setTime(map.get("time"));
return article;
}catch (Exception e){
logger.error("HBase数据查询异常:"+e.getMessage());
}
return null;
}
}
三、运行测试
1,数据入库
(1)首先执行项目中的 DataImport 代码,将数据导入到 HBase 和 Redis。
注意:DataImport 代码执行需要消耗一段时间。
(2)然后到 HBase 中验证数据,可以看到数据都完整保存进来了:
./bin/hbase shell count 'article' scan 'article'
(3)接着到 Redis 中验证数据,可以看到所有的文章 id 也都保存下来了。
redis-cli -a 123 lrange l_article_ids 0 -1
2,建立索引
(1)执行项目中的 DataIndex 代码,在 ES 中建立索引。执行完毕后到 ES 中验证数据:
curl -XGET 'http://192.168.121.128:9200/article/_search?pretty'
(2)可以看到返回结构如下,这里我们需要重点关注 _source 字段中是否包含 content 字段,如果包含此字段内容说明前面的 mapping 配置有问题,如果不包含此字段内容说明是正确的。

3,检索查询
(1)启动项目,我们首先测试搜索功能是否正常,关键字是否高亮。可以看到一共查到了 3 条数据,按条件返回最前面的两条数据。
提示:由于搜索数据是直接从 Elasticsearch 中返回,所以不包含正文数据。
- 再测试下翻页是否正常:

(2)接着测试详情查看接口,根据文章 ID 从 HBase 返回文章的全部数据,包括正文内容。
附:单索引库查询效率降低的问题
1,问题描述
爬虫程序每天都会到互联网上采集新的文章数据,如果项目运行了半年、1 年,所有的数据都存储到 Elasticsearch 的一个索引库里面,这样会导致查询效率降低。
2,解决办法
(1)可以考虑按周或者按月创建索引库,通过索引库别名关联最近半年内的索引库,实现默认查询最近半年内的数据。
- 索引库的命名可以按照一定的规律,假设是按月建立索引库,则索引库的名称大致是这样的:
article_202401 article_202402 article_202403 ......
(2)如果确实需要查询历史以来所有的数据,在查询的时候可以通过索引库通配符实现所有数据查询,使用这个索引库通配符即可:article_*,这样可以查询所有以 article_ 开头的索引库。
全部评论(0)