SpringBoot - Redis的整合与使用详解3(使用Pipeline管道批量操作数据、读取数据)
作者:hangge | 2024-11-15 08:43
在之前的文章中,我介绍了 Spring Boot 如何与 Redis 进行基本的整合操作:
而在大规模数据操作时,逐条执行 Redis 命令会导致性能瓶颈,因此 Redis 提供了 Pipeline(管道)技术,可以一次性发送多条命令,减少网络通信次数,提升性能。本篇教程将详细讲解如何在 Spring Boot 中使用 Redis 的 Pipeline 进行批量操作数据。


(2)下面是分别使用两种方式初始化 10 万个对象数据:


三、使用 Pipeline 管道批量操作数据
1,Pipeline 介绍
(1)Redis 的 Pipeline 允许客户端将多条命令放入缓冲区并一次性发送给 Redis 服务端,减少了网络的往返次数,从而提高性能。这个功能就类似于 mysql 中的 batch 批处理。
(2)通过 Pipeline,可以在一次请求中执行多条 Redis 指令,而不等待每条指令的返回结果,这对于批量操作数据尤为有用。

2,批量设置数据样例
(1)下面代码我们分别使用普通方式一条一条添加和使用管道批量初始化 10 万条数据,然后对比二者的耗时。
@RestController
public class HelloController {
@Autowired
StringRedisTemplate stringRedisTemplate; //默认提供的用来操作字符串的 redis 操作实例
@RequestMapping("/test")
public void test() {
// 不使用管道
long start_time = System.currentTimeMillis();
ValueOperations<String, String> ops1 = stringRedisTemplate.opsForValue();
for(int i=0;i<100000;i++){
ops1.set("a"+i, "a"+i);
}
long end_time = System.currentTimeMillis();
System.out.println("不使用管道,耗时:"+(end_time-start_time));
// 使用管道
long startPipelineTime = System.currentTimeMillis();
List<Object> results = stringRedisTemplate.executePipelined((RedisConnection connection) -> {
StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
for (int i = 0; i < 100000; i++) {
stringRedisConn.set("b" + i, "b" + i);
}
return null; // 返回 null,以符合接口要求
});
long endPipelineTime = System.currentTimeMillis();
System.out.println("使用管道,耗时:" + (endPipelineTime - startPipelineTime));
}
}
- 运行结果如下,可以看出来,针对海量数据的初始化,管道可以显著提高性能。
- 执行 Redis 的 info 命令查看数据库中的数据条数,可以发现确实有 20 万条数据。

@RestController
public class HelloController {
@Autowired
RedisTemplate redisTemplate; //默认提供的用来操作对象的 redis 操作实例
@RequestMapping("/test")
public void test() {
// 不使用管道
long start_time = System.currentTimeMillis();
ValueOperations ops2 = redisTemplate.opsForValue();
for(int i=0;i<100000;i++){
User u1 = new User();
u1.setId(i);
u1.setName("hangge");
u1.setAge(100);
ops2.set("user:a:"+i, u1);
}
long end_time = System.currentTimeMillis();
System.out.println("不使用管道,耗时:"+(end_time-start_time));
// 使用管道
long startPipelineTime = System.currentTimeMillis();
redisTemplate.executePipelined((RedisConnection connection) -> {
for (int i = 0; i < 100000; i++) {
User u1 = new User();
u1.setId(i);
u1.setName("hangge");
u1.setAge(100);
byte[] key = redisTemplate.getStringSerializer().serialize("user:b:" + i);
byte[] value = redisTemplate.getValueSerializer().serialize(u1);
connection.set(key, value);
}
return null; // 返回 null,以符合接口要求
});
long endPipelineTime = System.currentTimeMillis();
System.out.println("使用管道,耗时:" + (endPipelineTime - startPipelineTime));
}
}
- 运行结果如下,可以看到使用管道同样性能提升巨大。

3,批量读取数据样例
(1)下面代码读取前面插入的 10 万条数据,对比不使用管道和使用管道的性能差异:
@RestController
public class HelloController {
@Autowired
StringRedisTemplate stringRedisTemplate; //默认提供的用来操作字符串的 redis 操作实例
@RequestMapping("/test")
public void test() {
// 不使用管道
long start_time = System.currentTimeMillis();
ValueOperations ops2 = stringRedisTemplate.opsForValue();
List<String> list1 = new ArrayList<>();
for(int i=0;i<100000;i++){
String u1 =(String) ops2.get("a"+i);
list1.add(u1);
}
long end_time = System.currentTimeMillis();
System.out.println("不使用管道,耗时:"+(end_time-start_time)
+ "毫秒,list1 的长度:" + list1.size());
// 使用管道
long startPipelineTime = System.currentTimeMillis();
List<Object> list2 = stringRedisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
StringRedisConnection stringRedisConnection
= (StringRedisConnection) redisConnection;
for(int i=0;i<100000;i++){
stringRedisConnection.get("a"+i);
}
return null;
}
});
long endPipelineTime = System.currentTimeMillis();
System.out.println("使用管道,耗时:" + (endPipelineTime - startPipelineTime)
+ "毫秒,list2 的长度:" + list2.size());
}
}
(2)下面代码读取前面插入的 10 个对象数据,对比不使用管道和使用管道的性能差异:
@RestController
public class HelloController {
@Autowired
RedisTemplate redisTemplate; //默认提供的用来操作对象的 redis 操作实例
@RequestMapping("/test")
public void test() {
// 不使用管道
long start_time = System.currentTimeMillis();
ValueOperations ops2 = redisTemplate.opsForValue();
List<User> list1 = new ArrayList<>();
for(int i=0;i<100000;i++){
User u1 =(User) ops2.get("user:a:"+i);
list1.add(u1);
}
long end_time = System.currentTimeMillis();
System.out.println("不使用管道,耗时:"+(end_time-start_time)
+ "毫秒,list1 的长度:" + list1.size());
// 使用管道
long startPipelineTime = System.currentTimeMillis();
List<User> list2 = redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
for(int i=0;i<100000;i++){
redisConnection.get(redisTemplate.getKeySerializer().serialize("user:a:" + i));
}
return null;
}
});
long endPipelineTime = System.currentTimeMillis();
System.out.println("使用管道,耗时:" + (endPipelineTime - startPipelineTime)
+ "毫秒,list2 的长度:" + list2.size());
}
}

全部评论(0)