Redis系列:(二)Redis的高级特性和应用

2025-06-25 18:54
246
0

一、Redis的慢查询

慢查询的阈值默认为10毫秒(注意:这个时间是执行指令的时间,不包括网络传输时间和Redis线程排队时间)

相关配置介绍

  • 慢查询阈值设置的字段为:slowlog-log-slower-than 10000 表示慢查询阈值为10毫秒,单位为微妙,如果设置为小于0的数则表示不记录慢查询。
  • Redis的慢查询存储在内存中,在一个showlog的链表中slowlog-max-len 128 表示保存的慢查询队列的长度。

如下图所示为Redis配置文件中对慢查询的默认设置,

相关命令示例:

# 设置慢查询阈值为0(这里是为了方便测试,生产环境不要这样)
config set slowlog-log-slower-than 0

# 上面的命令服务器重启后会失效,想要永久生效,可以用如下命令把改动的参数回写到redis的配置文件中
config rewrite

# 获取慢查询日志的相关配置
CONFIG GET slowlog-*

# 获取慢查询日志记录,语法:SLOWLOG GET [数量],数量表示显示日志条数
SLOWLOG GET 2

# 获取当前慢查询日志的数量
SLOWLOG LEN

# 清空慢查询日志
SLOWLOG RESET

生产配置优化建议

  • slowlog-log-slower-than建议设置为 1~10 毫秒( 1000~10000)
  • 增大日志容量,生产环境slowlog-max-len可设置 1000~10000

注意:

  • 关于慢查询的日志分析,Redis没有提供分析的指令,需要通过SLOWLOG命令把日志拿出来自己分析。
  • 生产也可以使用定时任务把日志结果存到持久化层中,以便后续进行分析。

二、Pipeline

Redis Pipeline(管道)是一种批量执行命令的优化技术,用于减少客户端与服务器之间的网络往返延迟(RTT),显著提升高并发场景下的吞吐量。客户端将多个命令打包一次性发送给服务器,服务器处理完所有命令后,一次性返回所有结果。

使用Pipeline的注意事项:

  • Pipeline 中的命令会顺序执行,若包含耗时操作(如KEYS *、SORT),会阻塞后续命令的处理。
  • 对于大批量数据,还是要采取分批处理的方式,防止Redis内存激增。一般限制500条以下数据。

Java代码示例:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

import java.util.List;

public class RedisPipelineExample {
    public static void main(String[] args) {
        // 连接 Redis 服务器
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            // 示例 1:批量写入
            Pipeline pipeline = jedis.pipelined();
            for (int i = 0; i < 1000; i++) {
                pipeline.set("key" + i, "value" + i);
            }
            // 执行批量操作
            List<Object> results = pipeline.syncAndReturnAll();
            System.out.println("批量写入完成,共执行 " + results.size() + " 条命令");

            // 示例 2:批量读取(使用 Response 对象获取结果)
            Pipeline getPipeline = jedis.pipelined();
            Response<String> key1 = getPipeline.get("key1");
            Response<String> key2 = getPipeline.get("key2");
            Response<Long> incrResult = getPipeline.incr("counter");
            
            // 执行并获取结果
            getPipeline.sync();
            System.out.println("key1: " + key1.get());
            System.out.println("key2: " + key2.get());
            System.out.println("counter 自增后的值: " + incrResult.get());

            // 示例 3:混合操作(写入后读取)
            Pipeline mixedPipeline = jedis.pipelined();
            mixedPipeline.set("user:1", "Alice");
            mixedPipeline.hset("user:1:info", "age", "30");
            mixedPipeline.get("user:1");
            mixedPipeline.hgetAll("user:1:info");
            
            List<Object> mixedResults = mixedPipeline.syncAndReturnAll();
            System.out.println("混合操作结果:");
            for (Object result : mixedResults) {
                System.out.println(result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

pipeline内置了很多与命令同名的方法,直接调用即可。

获取结果的方式有两种,示例中都有覆盖:

  1. 使用 syncAndReturnAll() 返回所有命令的执行结果列表
  2. 或使用 Response<T> 对象获取特定命令的结果(需先声明,后调用 sync())

三、事务

Redis 事务是一组命令的集合,它可以确保多个命令按顺序执行,且在执行过程中不会被其他客户端的请求打断。

相关命令介绍:

  • MULTI 命令:开启一个事务块,后续命令将被加入队列中。
  • EXEC 命令:执行事务队列中的所有命令。
  • DISCARD 命令:取消事务,清空队列中的所有命令。
  • WATCH 命令:监控一个或多个键,在事务执行前若键被修改则事务失败(乐观锁)。

与传统关系型数据库的事务相比,Redis的事务非常的鸡肋,只能判断语法层面的错误进行回滚,更适合处理非强一致性要求的场景,不能保证事务的原子性和一致性。

如下图所示,命令格式正确但执行时出错,错误命令执行错误,但不会影响其它命令的正常执行:

注意:PipeLine是客户端封装所有命令组一起批量执行,能提升吞吐量。而事务依然是普通命令,每个命令都会进行网络传输,只是在MULTI和EXEC中间的命令Redis先放到缓冲区中,EXEC提交时才真正执行。

四、Lua

LUA脚本语言是C开发的,类似存储过程,具体的Lua语法可以查看:《Lua教程 | 菜鸟教程》

使用LUA脚本的好处:

  • 减少网络开销,在Luau脚本中可以把多个命令放在同一个脚本中运行。
  • 原子操作,Redis会将整个脚本作为一个整体执行,中间不会被其它命令插入。(因为Redis是单线程执行命令)
  • 复用性,客户端发送的脚本会存储在Redis中,这意味着其它客户端可以复用这一脚本完成同样的逻辑。

核心命令介绍:

1、执行Lua脚本

  • 语法:EVAL script numkeys key [key ...] arg [arg ...]
  • 参数:
    • script:Lua 脚本内容
    • numkeys:键名参数的个数
    • key [key ...]:键名参数,在 Lua 中通过 KEYS[1], KEYS[2] 访问
    • arg [arg ...]:附加参数,在 Lua 中通过 ARGV[1], ARGV[2] 访问

示例:

EVAL "return {KEYS[1], KEYS[2], ARGV[1], ARGV[2]}" 2 key1 key2 first second

2、脚本加载到Redis缓存中

  • 语法:SCRIPT LOAD script
  • 参数:script:Lua脚本内容
  • 返回:SHA1哈希值

示例:

SCRIPT LOAD "return {KEYS[1], KEYS[2], ARGV[1], ARGV[2]}'"
# 返回结果
493d4552555dd9618f79b2b6ef18f8a89fd8f4

3、执行缓存的脚本

  • 语法:EVALSHA sha1 numkeys key [key ...] arg [arg ...]
  • 参数:
    • sha1:SHA1哈希值(SCRIPT LOAD命令返回的值)
    • numkeys:键名参数的个数
    • key [key ...]:键名参数,在 Lua 中通过 KEYS[1], KEYS[2] 访问
    • arg [arg ...]:附加参数,在 Lua 中通过 ARGV[1], ARGV[2] 访问

示例:

EVALSHA 493d4552555dd9618f79b2b6ef18f8a89fd8f4 2 key1 key2 first second

五、发布与订阅

Redis的发布订阅功能比较鸡肋,在此做一下简单的介绍。

概念介绍:

  • 发布者(Publisher):向指定频道发送消息的客户端
  • 订阅者(Subscriber):监听特定频道获取消息的客户端
  • 频道(Channel):消息的逻辑分组,用于区分不同类型的消息(类似于MQ中的Topic)

核心命令介绍:

  • 订阅一个或多个频道,语法:SUBSCRIBE channel [channel …]
  • 向频道发布消息,语法:PUBLISH channel message
  • 取消订阅的频道,如果没指定频道表示取消所有订阅,语法:UNSUBSCRIBE [channel …]

为什么比较鸡肋?

  • 无消息持久化:消息发布后若无人订阅则丢失
  • 不保证可靠性:网络问题可能导致消息丢失
  • 无队列机制:无法处理消息堆积(需依赖其他组件)
  • 单线程处理:高并发场景下可能成为瓶颈

六、Stream

Redis 5.0版本发布的Stream功能,其借鉴了kafka的设计提供了持久化的能力,用于支持消息中间件(可以算是发布订阅功能的替代)。

6.1、概念介绍

  • Stream:消息的持久化存储容器,支持多生产者多消费者模型
  • 消息(Message):结构化数据,每个消息包含唯一 ID(ID格式:时间戳-序列号) 和键值对(Hash类型)
  • 消费者组(Consumer Group):多个消费者组成的组,支持负载均衡消费,多个消费者组可以消费同一个Stream,但它们之间是独立的互不干扰。
  • 消费者(Consumer):组内的单个消费者,组内独立维护消费进度

6.2、核心命令介绍

消息队列相关命令:

  • 1、XADD:添加消息,语法:XADD stream-key [MAXLEN count] * field value [field value ...]
    • stream-key:消息的主题(Topic),用于标识消息流。
    • MAXLEN(可选):设置流的最大长度,超出时自动删除旧消息。
    • *:自动生成唯一的消息ID(格式:时间戳-序列号)。
    • field value:消息的键值对,可重复添加多个字段。
  • 2、XTRIM:对流进行修剪,限制长度,语法:XTRIM stream-key MAXLEN [~] count
    • stream-key:目标流的名称。
    • MAXLEN:必选参数,指定流的最大长度。
    • ~(可选):近似修剪模式,允许Redis以更高效但不精确的方式删除消息。
    • count:保留的最大消息数量,超出部分将被删除。
  • 3、XDEL:删除消息,语法XDEL stream-key message-id [message-id ...]
    • stream-key:目标流的名称。
    • message-id:要删除的消息ID,支持批量删除。
  • 4、XLEN:获取流的消息数量,语法:XLEN stream-key
    • stream-key:目标流的名称。
    • 返回值:流中当前的消息总数(包括未删除的消息)。
  • 5、XRANGE:获取消息列表(按ID升序),语法:XRANGE stream-key start end [COUNT count]
    • stream-key:目标流的名称。
    • start/end:消息ID范围(如0-0表示最小ID,$表示最大ID)。
    • COUNT(可选):限制返回的消息数量,用于分页。
  • 6、XREVRANGE:反向获取消息列表(按ID降序),语法:XREVRANGE stream-key end start [COUNT count]
    • 参数含义:与XRANGE相同,但返回顺序相反。
  • 7、XREAD:阻塞或非阻塞读取消息,语法:XREAD [BLOCK milliseconds] STREAMS stream-key [stream-key ...] ID [ID ...]
    • BLOCK(可选):阻塞模式,指定超时时间(毫秒),无新消息时等待。
    • STREAMS:必选参数,后跟多个流名称。
    • ID:每个流的起始读取位置(如$表示从最新消息开始)。

消费者组相关命令:

  • 1、XGROUP CREATE:创建消费者组,语法:XGROUP CREATE stream-key group-name id [MKSTREAM]
    • stream-key:目标流的名称。
    • group-name:消费者组的名称。
    • id:起始消息ID(0-0表示从最早消息开始,$表示从最新消息开始)。
    • MKSTREAM(可选):若流不存在则自动创建。
  • 2、XREADGROUP GROUP:从消费者组读取消息,语法:XREADGROUP GROUP group-name consumer-name [BLOCK milliseconds] [COUNT count] STREAMS stream-key [stream-key ...] >
    • GROUP:必选参数,指定组名和消费者名。
    • BLOCK/COUNT:同XREAD,支持阻塞和分页。
    • >:表示读取未被任何消费者处理过的新消息。
  • 3、XACK:标记消息为“已处理”,语法:XACK stream-key group-name message-id [message-id ...]
    • 参数含义:将指定消息标记为已处理,从待处理列表(PEL)中移除。
  • 4、XGROUP SETID:重置消费者组的起始位置,语法:XGROUP SETID stream-key group-name id
    • id:新的起始消息ID,影响后续XREADGROUP的读取位置。
  • 5、XGROUP DELCONSUMER:删除消费者,语法:XGROUP DELCONSUMER stream-key group-name consumer-name
    • 参数含义:移除指定消费者,并将其未处理的消息转移到其他消费者。
  • 6、XGROUP DESTROY:删除消费者组,语法XGROUP DESTROY stream-key group-name
    • 参数含义:彻底删除消费者组及其状态信息。
  • 7、XPENDING:查看待处理消息,语法:XPENDING stream-key group-name [start end count [consumer-name]]
    • 返回值:显示待处理消息的统计信息或详细列表。
  • 8、XCLAIM:转移消息所有权,语法:XCLAIM stream-key group-name consumer-name min-idle-time message-id [message-id ...] [IDLE ms] [RETRYCOUNT count] [FORCE]
    • min-idle-time:消息未被处理的最小时间(毫秒)。
    • IDLE/RETRYCOUNT:更新消息的空闲时间和重试次数。
    • FORCE:强制转移未确认的消息。
  • 9、XINFO:查看流或消费者组信息,语法:XINFO [STREAM stream-key | GROUPS stream-key | CONSUMERS stream-key group-name]
    • STREAM:显示流的详细信息(如长度、最后ID)。
    • GROUPS:显示流的所有消费者组信息。
    • CONSUMERS:显示指定组的所有消费者及其状态。
  • 10、XINFO GROUPS:查看流的所有消费者组,语法:XINFO GROUPS stream-key
    • 返回值:包含组名、消费者数量、待处理消息数等信息。
  • 11、XINFO STREAM:查看流的详细信息,语法:XINFO STREAM stream-key
    • 返回值:流的元数据(如长度、第一个/最后一个消息ID)。

6.3、消费者组的详细工作流程

  1. 创建消费者组
    • 可以指定起始消费位置(如 0-0 表示从第一条消息开始)
    • MKSTREAM 选项在 Stream 不存在时创建它
  2. 消费者读取消息
    • 多个消费者可以同时从同一组读取消息,实现负载均衡
    • COUNT 参数指定每次读取的最大消息数
    • BLOCK 参数支持阻塞读取,等待新消息到达
  3. 消息确认机制
    • 消费者处理完消息后必须调用 XACK 确认
    • 未确认的消息会被视为待处理,在消费者故障时可重新分配
  4. 处理消费者故障
    • 长时间未确认的消息会被标记为 “pending”
    • 可以使用 XPENDING 命令查看 pending 消息
    • 可以使用 XCLAIM 命令将 pending 消息重新分配给其他消费者

6.4、注意事项

  • Redis毕竟不是专业的消息中间件,适合一些简单的场景,使用时要考虑内存压力问题。
  • 生产上如果要使用,要考虑以后是否要拓展,尽量还是使用专业的消息中间件。
  • 要合理设置 MAXLEN:根据业务需求设置 Stream 最大长度,避免内存溢出
  • 批量操作:使用 Pipeline 批量发送命令,减少网络往返
  • 注意pending 消息管理:定期处理长时间未确认的消息,避免积压
     

 

全部评论