Redis - 5种基本类型、4种特殊类型数据结构详解(附:常用命令)
作者:hangge | 2024-12-02 10:00
一、Redis 的数据结构类型概述
(1)Redis 有以下这五种基本类型:
- String(字符串)
- Hash(哈希)
- List(列表)
- Set(集合)
- zset(有序集合)
(2)以及四种特殊的数据结构类型:
- BitMap(位图,2.2 版新增)
- HyperLogLog(基数统计,2.8 版新增)
- GEO(地理位置,3.2 版新增)
- Stream(支持多播的可持久化消息队列,5.0 版新增)
二、String(字符串)
1,基本介绍
(1)String 是最基本的 key-value 结构,key 是唯一标识,value 是具体的值。value 其实不仅是字符串, 也可以是数字(整数或浮点数),value 最多可以容纳的数据长度是 512M。
(2)应用场景:缓存数据、共享 session、分布式锁、计数器、限流。
2,常用命令
set key value # 设置 key-value 类型的值 get key # 根据 key 获得对应的 value exists key # 判断某个 key 是否存在 strlen key # 返回 key 所储存的字符串值的长度 del name # 删除某个 key 对应的值 mset key1 value1 key2 value2 # 批量设置 key-value 类型的值 mget key1 key2 # 批量获取多个 key 对应的 value
3,应用场景使用样例
(1)缓存数据:将经常访问的数据存储为字符串,以便快速读取。
(2)计数器:将字符串作为计数器的值,用于记录点击量、用户数量等。
(3)分布式锁:使用字符串作为锁的值,实现分布式环境下的互斥操作。
# 直接缓存整个对象的 JSON set user:1 '{"name":"hangge", "age":18}' # 采用将 key 进行分离为 user:ID:属性,采用 MSET 存储,用 MGET 获取各属性值 mset user:1:name hangge user:1:age 18 user:2:name google user:2:age 20
# 设置 key-value 类型的值 set number 0 # 将 key 中储存的数字值增一 incr number # 将 key 中存储的数字值加 10 incrby number 10 # 将 key 中储存的数字值减一 decr number # 将 key 中存储的数字值键 10 decrby number 10
- SET 命令有个 NX 参数可以实现“key 不存在才插入”,可以用它来实现分布式锁。一般而言,还会对分布式锁加上过期时间,避免客户端发生异常而无法释放锁。并且使用 SET 命令设置锁变量值时,每个客户端设置的值是一个唯一值,用于标识客户端:
- 如果 key 不存在,则显示插入成功,可以用来表示加锁成功
- 如果 key 存在,则会显示插入失败,可以用来表示加锁失败
# lock_key 就是 key 键; # unique_value 是客户端生成的唯一的标识; # NX 代表只在 lock_key 不存在时,才对 lock_key 进行设置操作; # PX 10000 表示设置 lock_key 的过期时间为 10s,这是为了避免客户端发生异常而无法释放锁。 SET lock_key unique_value NX PX 10000
- 而解锁的过程就是将 lock_key 键删除,但不能乱删,要保证执行操作的客户端就是加锁的客户端。所以,解锁的时候,我们要先判断锁的 unique_value 是否为加锁客户端,是的话,才将 lock_key 键删除。
- 因此解锁是有两个操作,这时就需要 Lua 脚本来保证解锁的原子性,因为 Redis 在执行 Lua 脚本时,可以以原子性的方式执行,保证了锁释放操作的原子性。
// 释放锁时,先比较 unique_value 是否相等,避免锁的误释放 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
Redis 集群情况下保证分布式锁的可靠性解决方案:
- 为了保证集群环境下分布式锁的可靠性,Redis 官方已经设计了一个分布式锁算法 Redlock(红锁)。
- 它是基于多个 Redis 节点的分布式锁,即使有节点发生了故障,锁变量仍然是存在的,客户端还是可以完成锁操作。官方推荐是至少部署 5 个 Redis 节点,而且都是主节点,它们之间没有任何关系,都是一个个孤立的节点。
- Redlock 算法的基本思路,是让客户端和多个独立的 Redis 节点依次请求申请加锁,如果客户端能够和半数以上的节点成功地完成加锁操作,那么我们就认为,客户端成功地获得分布式锁,否则加锁失败。
- 这样一来,即使有某个 Redis 节点发生故障,因为锁的数据在其他节点上也有保存,所以客户端仍然可以正常地进行锁操作,锁的数据也不会丢失。
三、Hash(哈希)
1,基本介绍
(1)哈希类型是指 v(值)本身又是一个键值对(k-v)结构。
(2)hash 类型的值至多存储 2 的 32 次方 -1 个字段,一般情况下我们也达不到这个极限。
(3)应用场景:缓存用户属性信息、缓存对象、实时统计。
2,常用命令
# 存储一个哈希表 key 的键值 hset key field value # 获取哈希表 key 对应的 field 键值 hget key field # 在一个哈希表 key 中存储多个键值对 hmset key field value [field value...] # 批量获取哈希表 key 中多个 field 键值 hmget key field [field ...] # 删除哈希表 key 中的 field 键值 hdel key field [field ...] # 返回哈希表 key 中 field 的数量 hlen key # 返回哈希表 key 中所有的键值 hgetall key # 为哈希表 key 中 field 键的值加上增量 n hincrby key field n
注意:如果开发使用 hgetall,哈希元素比较多的话,可能导致 Redis 阻塞,可以使用 hscan。而如果只是获取部分 field,建议使用 hmget。
3,应用场景使用样例
(1)用户属性:将用户的各种属性(如姓名、年龄、性别)存储在哈希中,方便读取和更新。hset user:123 name "John" hget user:123 name
- 下面样例我们可以给学生生成一个编号拼接到 key 里面,姓名、年龄、性别、住址信息存储到 hash 类型的 value 中。
注意:这里面针对 key 的命名,stu 是 student 的简写,尽量不要写太多字符,否则会额外占用内存空间的,后面的:1,表示这个学生的编号是 1,后期如果我们想获取所有学员的 key,就可以使用这个规则进行过滤了。
hmset stu:1 name xiaoming age 18 sex 0 address beijing hgetall stu:1

# 网站访问量样例 hincrby stats:website visits 1 hget stats:website visits # 购物车样例 添加商品:HSET cart:{用户 id} {商品 id} 1 添加数量:HINCRBY cart:{用户 id} {商品 id} 1 商品总数:HLEN cart:{用户 id} 删除商品:HDEL cart:{用户 id} {商品 id} 获取购物车所有商品:HGETALL cart:{用户 id}
四、List(列表)
1,基本介绍
(1)列表(list)类型是用来存储多个有序的字符串,一个列表最多可以存储 2^32-1 个元素,也即每个列表支持超过 40 亿个元素。
(2)应用场景:消息队列、文章列表、历史记录、最新动态。

2,常用命令
lpush key value [value ...] # 将一个或多个值插入到列表头部。 rpush key value [value ...] # 将一个或多个值插入到列表尾部。 lpop key # 移除并返回列表的第一个元素。 rpop key # 移除并返回列表的最后一个元素。 blpop key [key ...] timeout # 阻塞式地从列表头部弹出一个或多个元素 brpop key [key ...] timeout # 阻塞式地从列表尾部弹出一个或多个元素 ltrim key start stop #修剪列表,只保留指定范围内的元素 lrange key start stop #返回列表中指定范围内的元素 lindex key index #获取列表指定角标的元素 lset key index value #修改列表中指定角标的元素
通常来说这些命令会配合使用:
- lpush + lpop = Stack(栈)
- lpush + rpop = Queue(队列)
- lplpushsh + ltrim = Capped Collection(有限集合)
- lpush + brpop = Message Queue(消息队列)
3,应用场景使用样例
(1)消息队列:将待处理的消息添加到列表的一端,从另一端读取并处理。lpush queue:messages "message1" lpop queue:messages
lpush user:1234:history "action1" lrange user:1234:history 0 -1
lpush newsfeed:user1234 "New post by user1234" lrange newsfeed:user1234 0 9
五、Set(集合)
1,基本介绍
(1)集合(set)类型也是用来保存多个的字符串元素,但是不允许重复元素。一个集合最多可以存储 2^32-1 个元素,也即每个集合支持超过 40 亿个元素。
Set 类型和 List 类型的区别如下:
- List 可以存储重复元素,Set 只能存储非重复元素;
- List 是按照元素的先后顺序存储元素的,而 Set 则是无序方式存储元素的。
(2)应用场景:点赞、标签系统、共同关注、好友关系、唯一访客统计、抽奖活动
2,常用命令
sadd key member [member ...] # 向集合中添加一个或多个成员 srem key member [member ...] # 从集合 key 中删除元素 smembers key # 返回集合中的所有成员 sismember key member # 判断一个成员是否属于集合 scard key #返回集合的基数(元素数量) spop key [count] # 随机移除并返回集合中的一个或多个元素 srandmember key [count] # 返回集合中一个或多个随机元素,不会从集合中移除 sinter key [key ...] # 返回多个集合的交集 sunion key [key ...] # 返回多个集合的并集 sdiff key [key ...] # 返回多个集合的差集
注意:
- Set 的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计算,会导致 Redis 实例阻塞。
- 在主从集群中,为了避免主库因为 Set 做聚合计算(交集、差集、并集)时导致主库被阻塞,我们可以选择一个从库完成聚合统计,或者把数据返回给客户端,由客户端来完成聚合统计。
3,应用场景使用样例
(1)点赞:Set 类型可以保证一个用户只能点一个赞,例如 key 是文章 id,value 是用户 id。
# uid:1 用户对文章 article:1 点赞 SADD article:1 uid:1 # uid:2 用户对文章 article:1 点赞 SADD article:1 uid:2 # uid:3 用户对文章 article:1 点赞 SADD article:1 uid:3 # uid:1 取消了对 article:1 文章点赞。 SREM article:1 uid:1 # 获取 article:1 文章所有点赞用户 SMEMBERS article:1 # 获取 article:1 文章的点赞用户数量 SCARD article:1 # 判断用户 uid:1 是否对文章 article:1 点赞了(返回 0 说明没点赞,返回 1 则说明点赞了) SISMEMBER article:1 uid:1
(2)共同关注:Set 类型支持交集运算,所以可以用来计算共同关注的好友、公众号等。例如 key 可以是用户 id,value 则是已关注的公众号的 id
# uid:1 用户关注公众号 id 为 5、6、7、8、9 SADD uid:1 5 6 7 8 9 # uid:2 用户关注公众号 id 为 7、8、9、10、11 SADD uid:2 7 8 9 10 11 # 获取共同关注 SINTER uid:1 uid:2 @ 给 uid:2 推荐 uid:1 关注的公众号: SDIFF uid:1 uid:2 @ 验证某个公众号是否同时被 uid:1 或 uid:2 关注(返回 1 说明关注了;返回 0 说明没关注) SISMEMBER uid:1 5
(3)抽奖活动:存储某活动中中奖的用户名 ,Set 类型因为有去重功能,可以保证同一个用户不会中奖两次。
# key 为抽奖活动名,value 为员工名称,把所有员工名称放入抽奖箱 SADD lucky Tom Jerry John Sean Marry Lindy Sary Mark #如果允许重复中奖,可以使用 SRANDMEMBER 命令。 # 抽取 1 个一等奖: SRANDMEMBER lucky 1 # 抽取 2 个二等奖: SRANDMEMBER lucky 2 # 抽取 3 个三等奖: SRANDMEMBER lucky 3 #如果不允许重复中奖,可以使用 SPOP 命令。 # 抽取一等奖 1 个 SPOP lucky 1 # 抽取二等奖 2 个 SPOP lucky 2 # 抽取三等奖 3 个 SPOP lucky 3
(4)标签系统:将标签与相关内容的 ID 存储在集合中,用于快速检索和过滤。
sadd post:1001:tags "redis" "database" sinter post:1001:tags post:1002:tags
sadd site:visitors "192.168.0.1" scard site:visitors
六、zset(有序集合)
1,基本介绍
(1)zset(有序集合)是已排序的字符串集合,同时元素不能重复。
(2)应用场景:排序场景,比如排行榜、时间线、范围查询、以及一些需要延迟队列的场景
2,常用命令
zadd key score1 member1 [score2 member2 ...] # 向有序集合添加一个或多个成员,以及其分数(权重) zrange key start stop [withscores] # 返回有序集合中指定范围内的成员和分数(从低分到高分) zrevrange key start stop [withscores] # 返回有序集合中指定范围内的成员和分数(从高分到高分低分) zrangebyscore key min max [withscores] # 返回有序集合中指定分数范围内的成员和分数 zrank key member # 返回有序集合中指定成员的排名(从低分到高分) zrevrank key member # 返回有序集合中指定成员的排名(从高分到低分) zscore key member # 返回有序集合中指定成员的分数 zrem key member [member ...] # 移除有序集合中一个或多个成员 zcard key # 返回有序集合 key 中元素个数 zincrby key increment member # 为有序集合 key 中元素 member 的分值加上 increment # 返回指定成员区间内的成员,按字典正序排列, 分数必须相同。 zrangelex key min max [LIMIT offset count] # 返回指定成员区间内的成员,按字典倒序排列, 分数必须相同 zrevrangelex key max min [LIMIT offset count]
3,应用场景使用样例
(1)排行榜:将用户的得分与其 ID 存储在有序集合中,用于排行榜的展示和更新。# 用户 Jay 上传一个视频,获得 6 个赞,可以这样: zadd user:ranking:2021-03-03 Jay 6 # 过了一段时间,再获得一个赞,可以这样: zincrby user:ranking:2021-03-03 Jay 1 # 如果某个用户 John 作弊,需要删除该用户: zrem user:ranking:2021-03-03 John # 展示获取赞数最多的 3 个用户 zrevrange user:ranking:2021-03-03 0 2 withscores
zadd timeline 1622640000 event1 1622650000 event2 1622660000 event3 zrangebyscore timeline 1622640000 1622660000
zadd prices 1.99 product1 2.99 product2 0.99 product3 zrangebyscore prices 0.99 2.99
# 电话排序样例:我们可以将电话号码存储到 SortSet 中,然后根据需要来获取号段: ZADD phone 0 13100111100 0 13110114300 0 13132110901 ZADD phone 0 13200111100 0 13210414300 0 13252110901 ZADD phone 0 13300111100 0 13310414300 0 13352110901 # 获取所有号码 ZRANGEBYLEX phone - + # 获取 132 号段的号码: ZRANGEBYLEX phone [132 (133 #获取 132、133 号段的号码: ZRANGEBYLEX phone [132 (134 # 姓名排序 zadd names 0 Toumas 0 Jake 0 Bluetuo 0 Gaodeng 0 Aimini 0 Aidehua # 获取所有人的名字: ZRANGEBYLEX names - + # 获取名字中大写字母 A 开头的所有人: ZRANGEBYLEX names [A (B # 获取名字中大写字母 C 到 Z 的所有人: ZRANGEBYLEX names [C [Z
(5)延迟队列:可以通过使用有序集合来实现。有序集合的成员是任务,分数是任务的执行时间戳。利用有序集合的特性,可以按照任务的执行时间进行排序。以下是一个基本的实现思路:
- 将任务信息作为有序集合的成员,任务的执行时间戳作为分数。
- 使用一个定时器,定期检查有序集合中是否有任务的执行时间到了。
- 如果发现有任务的执行时间到了,就从有序集合中取出该任务,并将任务放入待执行队列。
延迟队列是指把当前要做的事情,往后推迟一段时间再做。延迟队列的常见使用场景有以下几种:
- 在淘宝、京东等购物平台上下单,超过一定时间未付款,订单会自动取消;
- 打车的时候,在规定时间没有车主接单,平台会取消你的单并提醒你暂时没有车主接单;
- 点外卖的时候,如果商家在 10 分钟还没接单,就会自动取消订单;
七、BitMap(位图)
1,基本介绍
(1)Bitmap,即位图,是一串连续的二进制数组(0 和 1),可以通过偏移量(offset)定位元素。BitMap 通过最小的单位 bit 来进行 0|1 的设置,表示某个元素的值或者状态,时间复杂度为 O(1)。(2)应用场景:二值状态统计的场景,比如签到统计、判断用户登陆态、连续签到用户总数
2,常用命令
# 设置值,其中 value 只能是 0 和 1 SETBIT key offset value # 获取值 GETBIT key offset # 获取指定范围内值为 1 的个数(start 和 end 以字节为单位) BITCOUNT key start end # BitMap 间的运算 # operations 位移操作符,枚举值 # AND 与运算 & # OR 或运算 | # XOR 异或 ^ # NOT 取反 ~ # result 计算的结果,会存储在该 key 中 # key1 … keyn 参与运算的 key,可以有多个,空格分割,not 运算只能一个 key # 当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0。 # 返回值是保存到 destkey 的字符串的长度(以字节 byte 为单位),和输入 key 中最长的字符串长度相等。 BITOP [operations] [result] [key1] [keyn…] # 返回指定 key 中第一次出现指定 value(0/1)的位置 BITPOS [key] [value]
3,应用场景使用样例
(1)签到统计:在签到打卡的场景中,我们只用记录签到(1)或未签到(0),所以它就是非常典型的二值状态。每个用户一天的签到用 1 个 bit 位就能表示,一个月(假设是 31 天)的签到情况用 31 个 bit 位就可以,而一年的签到也只需要用 365 个 bit 位,根本不用太复杂的集合类。# 假设我们要统计 ID 100 的用户在 2022 年 6 月份的签到情况 # 记录该用户 6 月 3 号已签到。 SETBIT uid:sign:100:202206 2 1 # 检查该用户 6 月 3 日是否签到。 GETBIT uid:sign:100:202206 2 # 统计该用户在 6 月份的签到次数。 BITCOUNT uid:sign:100:202206 # 获取该用户在 2022 年 6 月份首次打卡日期 BITPOS uid:sign:100:202206 1
(2)判断用户登陆态:Bitmap 提供了 GETBIT、SETBIT 操作,通过一个偏移值 offset 对 bit 数组的 offset 位置的 bit 位进行读写操作,需要注意的是 offset 从 0 开始。
- 只需要一个 key = login_status 表示存储用户登陆状态集合数据, 将用户 ID 作为 offset,在线就设置为 1,下线设置 0。通过 GETBIT 判断对应的用户是否在线。 5000 万用户只需要 6 MB 的空间。
# 假如我们要判断 ID = 10086 的用户的登陆情况: # 执行以下指令,表示用户已登录。 SETBIT login_status 10086 1 # 检查该用户是否登陆,返回值 1 表示已登录。 GETBIT login_status 10086 #登出,将 offset 对应的 value 设置成 0。 SETBIT login_status 10086 0
(3)连续签到用户总数:假设我们需要统计出这连续 7 天连续打卡用户总数
- 我们把每天的日期作为 Bitmap 的 key,userId 作为 offset,若是打卡则将 offset 位置的 bit 设置成 1。
- key 对应的集合的每个 bit 位的数据则是一个用户在该日期的打卡记录。
- 一共有 7 个这样的 Bitmap,如果我们能对这 7 个 Bitmap 的对应的 bit 位做 『与』运算。同样的 UserID offset 都是一样的,当一个 userID 在 7 个 Bitmap 对应对应的 offset 位置的 bit = 1 就说明该用户 7 天连续打卡。
- 结果保存到一个新 Bitmap 中,我们再通过 BITCOUNT 统计 bit = 1 的个数便得到了连续打卡 7 天的用户总数了。
- Redis 提供了 BITOP operation destkey key [key ...] 这个指令用于对一个或者多个 key 的 Bitmap 进行位元操作。
- operation 可以是 and、OR、NOT、XOR。当 BITOP 处理不同长度的字符串时,较短的那个字符串所缺少的部分会被看作 0 。空的 key 也被看作是包含 0 的字符串序列。
# 假设要统计 3 天连续打卡的用户数,则是将三个 bitmap 进行 AND 操作,并将结果保存到 destmap 中 # 接着对 destmap 执行 BITCOUNT 统计,如下命令: # 与操作 BITOP AND destmap bitmap:01 bitmap:02 bitmap:03 # 统计 bit 位 = 1 的个数 BITCOUNT destmap
提示:即使一天产生一个亿的数据,Bitmap 占用的内存也不大,大约占 12 MB 的内存(10^8/8/1024/1024),7 天的 Bitmap 的内存开销约为 84 MB。同时我们最好给 Bitmap 设置过期时间,让 Redis 删除过期的打卡数据,节省内存。
八、HyperLogLog(基数统计)
1,基本介绍
(1)HyperLogLog 是 Redis 2.8.9 版本新增的数据类型,是一种用于「统计基数」的数据集合类型,基数统计就是指统计一个集合中不重复的元素个数。但要注意,HyperLogLog 是统计规则是基于概率完成的,不是非常准确,标准误算率是 0.81%。
- 简单来说 HyperLogLog 提供不精确的去重计数。
- HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的内存空间总是固定的、并且是很小的。
(2)应用场景:海量数据基数统计的场景,比如百万级网页 UV 计数等
2,常用命令
# 添加指定元素到 HyperLogLog 中 PFADD key element [element ...] # 返回给定 HyperLogLog 的基数估算值。 PFCOUNT key [key ...] # 将多个 HyperLogLog 合并为一个 HyperLogLog PFMERGE destkey sourcekey [sourcekey ...]
3,应用场景使用样例
(1)百万级网页 UV 计数:HyperLogLog 优势在于只需要花费 12 KB 内存,就可以计算接近 2^64 个元素的基数,和元素越多就越耗费内存的 Set 和 Hash 类型相比,HyperLogLog 就非常节省空间。
# 在统计 UV 时,可以用 PFADD 命令把访问页面的每个用户都添加到 HyperLogLog 中 PFADD page1:uv user1 user2 user3 user4 user5 #接下来,用 PFCOUNT 命令直接获得 page1 的 UV 值了,这个命令的作用就是返回 HyperLogLog 的统计结果 PFCOUNT page1:uv
注意:HyperLogLog 的统计规则是基于概率完成的,所以它给出的统计结果是有一定误差的,标准误算率是 0.81%。这也就意味着,你使用 HyperLogLog 统计的 UV 是 100 万,但实际的 UV 可能是 101 万。虽然误差率不算大,但是,如果你需要精确统计结果的话,最好还是继续用 Set 或 Hash 类型。
九、GEO(地理位置)
1,基本介绍
(1)Redis GEO 是 Redis 3.2 版本新增的数据类型,主要用于存储地理位置信息,并对存储的信息进行操作。
- 在日常生活中,我们越来越依赖搜索“附近的餐馆”、在打车软件上叫车,这些都离不开基于位置信息服务(Location-Based Service,LBS)的应用。LBS 应用访问的数据是和人或物关联的一组经纬度信息,而且要能查询相邻的经纬度范围,GEO 就非常适合应用在 LBS 服务的场景中。
(2)应用场景:存储地理位置信息的场景,比如附近的餐馆、滴滴叫车
2,常用命令
# 存储指定的地理空间位置,可将一个或多个经度(longitude)、纬度(latitude)、位置名称(member)添加到指定的 key 中 GEOADD key longitude latitude member [longitude latitude member ...] # 从给定的 key 里返回所有指定名称(member)的位置(经度和纬度),不存在的返回 nil。 GEOPOS key member [member ...] # 返回两个给定位置之间的距离。 GEODIST key member1 member2 [m|km|ft|mi] # 根据用户给定的经纬度坐标来获取指定范围内的地理位置集合。 GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
3,应用场景使用样例
(1)滴滴叫车:可以使用 GEO 的 GEOADD 和 GEORADIUS 这两个命令实现该功能。# 用一个 GEO 集合保存所有车辆的经纬度,集合 key 是 cars:locations。 # 例如,把 ID 号为 33 的车辆的当前经纬度位置存入 GEO 集合中 GEOADD cars:locations 116.034579 39.030452 33 # 当用户想要寻找自己附近的网约车时,LBS 应用就可以使用 GEORADIUS 命令。 # 例如,下面命令时根据用户的经纬度信息,查找以这个经纬度为中心的 5 公里内的车辆信息并返回 GEORADIUS cars:locations 116.054579 39.030452 5 km ASC COUNT 10
十、Stream(消息队列)
1,基本介绍
(1)Stream 是 Redis 5.0 版本新增加的数据类型,Redis 专门为消息队列设计的数据类型。用于完美地实现消息队列,它支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。
在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:
- 发布订阅模式,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
- List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID。
(2)应用场景:消息队列,相比于基于 List 类型实现的消息队列,有这两个特有的特性:自动生成全局唯一消息 ID,支持以消费组形式消费数据
2,常用命令
XADD # 插入消息,保证有序,可以自动生成全局唯一 ID; XLEN # 查询消息长度; XREAD # 用于读取消息,可以按 ID 读取数据; XDEL # 根据消息 ID 删除消息; DEL # 删除整个 Stream; XRANGE # 读取区间消息 XREADGROUP # 按消费组形式读取消息; XPENDING # 用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息; XACK # 用于向消息队列确认消息处理已完成;
3,简单的消息队列使用样例
(1)使用 xadd 存入消息和 xread 循环阻塞读取消息的方式可以实现简易版的消息队列,

(3)消费者通过 XREAD 命令从消息队列中读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条信息开始读取,不是查询输入 ID 的消息)。
(4)如果想要实现阻塞读(当没有数据时,阻塞住),可以调用 XRAED 时设定 BLOCK 配置项,实现类似于 BRPOP 的阻塞读取操作。

(2)生产者通过 XADD 命令插入一条消息,插入成功后会返回全局唯一的 ID,比如“1654254953808-0”。消息的全局唯一 ID 由两部分组成:
- 第一部分“1654254953808”是数据插入时,以毫秒为单位计算的当前服务器时间;
- 第二部分表示插入消息在当前毫秒内的消息序号,这是从 0 开始编号的。例如,“1654254953808-0”就表示在“1654254953808”毫秒内的第 1 条消息。
# * 表示让 Redis 为插入的数据自动生成一个全局唯一的 ID # 往名称为 mymq 的消息队列中插入一条消息,消息的键是 name,值是 hangge XADD mymq * name hangge
(3)消费者通过 XREAD 命令从消息队列中读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取(注意是输入消息 ID 的下一条信息开始读取,不是查询输入 ID 的消息)。
# 从 ID 号为 1654254953807-0 的消息开始,读取后续的所有消息(示例中一共 1 条)。 > XREAD STREAMS mymq 1654254953807-0 1) 1) "mymq" 2) 1) 1) "1654254953808-0" 2) 1) "name" 2) "hangge"
(4)如果想要实现阻塞读(当没有数据时,阻塞住),可以调用 XRAED 时设定 BLOCK 配置项,实现类似于 BRPOP 的阻塞读取操作。
# 下面命令表明 XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。 # 命令最后的“$”符号表示读取最新的消息 XREAD BLOCK 10000 STREAMS mymq $
4,消费组的使用样例
(1)Stream 可以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。
- 消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。
- 不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)。
(2)为保障消息可靠性,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。
- 消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示。
- 如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。

(3)下面命令创建两个消费组,这两个消费组消费的消息队列是 mymq,都指定从第一条消息开始读取:
# 创建一个名为 group1 的消费组,0-0 表示从第一条消息开始读取。 XGROUP CREATE mymq group1 0-0 # 创建一个名为 group2 的消费组,0-0 表示从第一条消息开始读取。 XGROUP CREATE mymq group2 0-0
(4)消费组 group1 内的消费者 consumer1 从 mymq 消息队列中读取所有消息的命令如下:
# 命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。 > XREADGROUP GROUP group1 consumer1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1654254953808-0" 2) 1) "name" 2) "hangge"
(5)同一个消费组里的消费者不能消费同一条消息,比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:
> XREADGROUP GROUP group1 consumer1 STREAMS mymq > (nil)
(6)但是,不同消费组的消费者可以消费同一条消息,刚才 group1 消费组里的 consumer1 消费者消费了一条 id 为 1654254953808-0 的消息,现在用 group2 消费组里的 consumer1 消费者消费消息:
> XREADGROUP GROUP group2 consumer1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1654254953808-0" 2) 1) "name" 2) "hangge"
(7)使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息 > XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1654254953808-0" 2) 1) "name" 2) "hangge" # 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息 > XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1654256265584-0" 2) 1) "name" 2) "baidu" # 让 group2 中的 consumer3 从 mymq 消息队列中消费一条消息 > XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq > 1) 1) "mymq" 2) 1) 1) "1654256271337-0" 2) 1) "name" 2) "google"
附:Redis 基于 Stream 消息队列的不足
1,不足
相较于专业的消息队列,Redis 基于 Stream 消息队列存在本身可能会丢失数据的情况。
2,丢失数据的原因
(1)AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
(2)主从复制也是异步的,主从切换时,也存在丢失数据的可能 (opens new window)。
(3)Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。所以 Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。当指定队列最大长度时,队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
全部评论(0)