RocketMQ系列:(一)概念介绍、知识点速览

2025-04-03 00:42
428
0

前不久找工作面试被问及RocketMQ事务消息相关问题,答案在脑子里就是想不起来,不好意思,脸红☺。所以打算找时间把RocketMQ官方文档完整再看一遍,进行强化学习。

建议初学RocketMQ的朋友,也不用看其他参考资料,官网文档很详尽,把文档来回看几遍,写个代码实操一下,基本就能掌握了。

本系列基于RocketMQ最新的5.0版本进行介绍,本篇文章介绍RocketMQ的基础概念和简单使用,集群、原理、源码解读后续一步步发布。

一、概念介绍

Apache RocketMQ 是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。

如下先对RocketMQ中的名词概念做一个解释(官方文档

1.1、消息服务端相关概念

  • 消息(Message):消息是 Apache RocketMQ 中的最小数据传输单元。消息内部属性查看官方文档
  • 主题(Topic):用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。官方文档
  • 消息标签(MessageTag):在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。
  • 消息类型(MessageType):Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息
  • 消息队列(MessageQueue):队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,通过流式特性无限队列结构来存储消息,消息在队列内具备顺序性存储特征。队列通过QueueId来做唯一标识和区分。官方文档
  • 消息视图(MessageView):消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。
  • 消息位点(MessageQueueOffset):是消息在消息队列里的唯一标识,Long类型(队列内唯一)。作用:消息定位、消息追踪、保证顺序消费、消息重试与回溯。
  • 消费位点(Consume Offset):是用于记录消费者端消息消费进度的核心概念。它决定了消费者从哪个位置开始继续消费消息,确保消息不重复、不遗漏,同时支持故障恢复和负载均衡。
  • 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息。
  • 消息索引(MessageKey):快速定位和查询消息的键值对映射,允许通过关键字快速定位消息。支持按消息属性(如消息 ID、标签、业务 Key)或内容进行高效检索。
  • 消息轨迹:在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由Apache RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。
  • 消息堆积:生产者已经将消息发送到Apache RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
  • 事务消息:是Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
  • 定时/延时消息:是Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
  • 顺序消息:是Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

1.2、生产者相关概念

  • 生产者(Producer):用来构建并传输消息到服务端的运行实体。官方文档
  • 事务检查器(TransactionChecker):生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。
  • 事务状态(TransactionResolution):事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。

1.3、消费者相关概念

  • 消费者(Consumer):用来接收并处理消息的运行实体。官方文档
  • 消费者者分组(ConsumerGroup):管理多个消费者实例的协作和负载均衡。消费者分组是一组具有相同 Group ID 的消费者实例的集合,它们共同订阅同一主题(Topic),并通过负载均衡机制分摊消息的消费任务。(作用:负载均衡、高可用性、消息重试、幂等保障)官方文档
  • 消费结果(ConsumeResult):PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
  • 订阅关系(Subscription):消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。官方文档
  • 消息过滤:消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在Apache RocketMQ 的服务端完成。

二、核心功能

消息中间件的核心功能肯定就是收发消息了。RocketMQ的功能特性包括了:普通消息、定时/延时消息、顺序消息、事务消息、消息发送重试、流控机制、消费者分类、消息过滤、消费者负载均衡、消费进度管理、消费重试、消息存储和清理机制。

下面我们分类进行介绍。

2.1、四种消息类型的收发

普通消息(官方文档

  • 最基本的消息类型,消息的发送和消费是异步的,不保证消息的顺序性和发送的可靠性,具有较高的吞吐量。
  • 应用场景:适用于对消息顺序和可靠性要求不高,只需要快速传递和处理大量消息的场景,如网站的访问日志记录、简单的通知消息等。

定时/延时消息(官方文档

  • RocketMQ 通过将消息先存储在特定的延迟队列中,然后根据设定的时间进行调度来实现定时 / 延时功能。
  • 应用场景:常用于一些需要定时执行的任务,如定时发送短信、优惠券到期提醒、订单超时未支付自动取消等场景。
  • 定时时长范围限制:定时时间得在规定定时时长范围内,超范围定时不生效,服务器会马上投递消息。默认定时时长最大值为 24 小时,且不能自定义修改。比如设置消息在 30 小时后发送,超出 24 小时范围,消息会立即被投递。
  • 时间先后要求:定时时间必须在当前时间之后。若设为当前时间之前,定时无效,服务器直接投递消息。比如当前时间是 2024 - 01 - 01 10:00:00 ,设置定时时间为 2024 - 01 - 01 09:00:00 ,消息会马上被发送。

顺序消息(官方文档

  • 保证消息的发送顺序和消费顺序一致。要保证生产顺序性,有一定的要求,具体可以查看官方文档
  • 应用场景:适用于对消息顺序有严格要求的场景,如金融交易场景中的订单处理、股票交易的实时数据推送,以及一些业务流程中需要按照特定顺序执行的操作等。

事务消息

  • 将本地事务和消息发送结合起来,保证两者要么都成功,要么都失败,从而实现数据的一致性。RocketMQ 的事务消息分为 Half Message(半消息)、Prepare 阶段、Commit 或 Rollback 阶段。
  • 应用场景:在一些分布式系统中,当涉及到多个系统之间的数据交互和一致性保证时,如电商系统中的下单和库存扣减、银行转账等场景,事务消息可以确保在多个操作之间实现原子性,避免数据不一致的情况发生。

事务消息有一点需要注意:RocketMQ的事务消息机制设计上要求事务的提交或回滚必须由原始生产者实例完成,即事务消息的两个阶段无法完全解耦,不能由一个服务进行提交,一个服务发送Commit。

那如何进行分布式事务?

  • 比如A-》B-》C是一个完整的事务,A、B、C是不同的服务,可以A发起事务消息,B或者C服务执行失败或者执行完成发送消息告诉A服务,A服务根据不同情况执行Commit或者Rollback。

具体的代码实现,我基于 Spring Boot 和 RocketMQ 的编写了一个消息队列示例Demo,展示了四种消息类型的实现:代码示例

在实际使用中,参数的定义可以参考:官方的参数约束和建议

2.2、消息的生产和消费

2.2.1、同步异步发送

同步发送:

  • 使用producer.send(msg)方法,该方法会阻塞当前线程直至获取结果,直接返回SendResult对象,可立即处理发送结果。
  • 适用于关键业务消息(如订单、支付)
  • 配合重试机制(setRetryTimesWhenSendFailed)提高可靠性
  • 监控响应时间,避免长时间阻塞影响系统吞吐量

异步发送:

  • 使用producer.send(msg, callback)方法,传入SendCallback接口实现,发送后立即返回,结果通过回调函数异步处理。
  • 适用于高并发、吞吐量敏感的场景(如日志、埋点数据)
  • 必须做好异常处理,在回调函数中实现重试或降级逻辑
  • 使用线程池隔离技术,避免回调处理影响主业务流程

2.2.2、消息的发送重试

为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。

具体的重试触发条件,直接看官网吧,就不列举了。

同步发送和异步发送模式均支持消息发送重试 :

  • 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常。
  • 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回。

代码示例:

		// 初始化普通消息生产者
        producer = new DefaultMQProducer(rocketMQConfig.getProducer().getGroup());
        producer.setNamesrvAddr(rocketMQConfig.getNameServer());
        // 设置同步发送失败的重试次数(默认为2次)
        producer.setRetryTimesWhenSendFailed( 3); 
        // 设置异步发送失败时的重试次数(默认为2次)
        producer.setRetryTimesWhenSendAsyncFailed(3);

注意:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。“透明重试” 指 RocketMQ 内部自动处理的重试逻辑,仅适用于Broker 可明确处理的临时性错误。

为什么普通的消息能进行重试,事务消息只进行透明重试?

  • 透明重试RocketMQ能确保操作的幂等性,而网络异常等问题拥有不确定性。
  • 为防止重复提交的可能,RocketMQ将网络异常的处理交给应用层处理,避免分布式环境下的不确定性破坏事务的一致性。
  • 普通的消息允许一定程度的重复(可以消费端幂等消费来解决重复问题),不涉及分布式事务状态的一致性问题。

应用层如何处理网络异常?

  • 本地记录状态:Producer 在发送提交 / 回滚请求前,先记录本地事务状态(如数据库标记为 “待提交”)。
  • 定时任务回查:通过定时任务扫描未确认的事务,手动重试提交 / 回滚。
  • 结合 Broker 回查机制:RocketMQ 的事务消息回查机制(默认 60 秒后)会主动询问 Producer 事务状态,可作为兜底方案。

2.2.3、消息的流量控制

系统容量或水位过高时 Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。官方文档

首先明确,流控是由RocketMQ自动触发的,无需人工干预。

RocketMQ的流控可以分为如下三类:

  • 生产者流控:当 Broker 无法及时处理消息时,自动对生产者进行流控
  • 消费者流控:当消费者处理消息过慢时,自动限制拉取速度
  • Broker 端流控:基于自身资源(内存、磁盘、CPU)自动触发保护机制

发生流控该如何处理?

  1. 流控持续时应该做好服务降级的保障:
    • 非核心业务消息转为异步发送。
    • 关键业务消息启用本地持久化保存,定时调度触发等待流控解除。
  2. 可以通过查看日志和Dashboard监控对问题进行分析,找到问题根因。再针对性考虑解决方案。

生产者可以调整发送参数,示例:

DefaultMQProducer producer = new DefaultMQProducer("group");
// 增加发送超时时间(默认3s)
producer.setSendMsgTimeout(5000);
// 设置重试次数(流控后会自动重试)
producer.setRetryTimesWhenSendFailed(3);
// 降低发送速率(适用于突发流量)
producer.setSendLatencyFaultEnable(true);

消费者可以跳转消费参数,示例:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
// 增加消费线程数(默认16)
consumer.setConsumeThreadMin(32);
consumer.setConsumeThreadMax(64);
// 增加拉取批次大小(默认32条)
consumer.setPullBatchSize(100);
// 降低拉取间隔(默认100ms)
consumer.setPullInterval(50);

broker.conf配置参数调整,示例:

# 内存水位阈值(默认0.4,0.4表示40%)
highWaterMarkMemoryPercent=0.6
# 磁盘水位阈值(默认0.85)
diskMaxUsedSpaceRatio=0.9
# 刷盘策略(同步刷盘改为异步)
flushDiskType=ASYNC_FLUSH

2.2.4、消费者分类

1、PushConsumer(推模式消费者)

  • 核心机制:基于长轮询机制自动从 Broker 拉取消息,拉取到消息后通过回调函数异步处理,无需开发者手动控制拉取节奏。
  • 关键优势:
    • 易用性高:只需注册消息监听器(Listener)即可实现消息消费,适合快速开发。
    • 实时性强:消息到达后能及时被拉取,适用于实时业务场景。
    • 完整功能封装:内置负载均衡、重试、流控、顺序消费等机制,减少开发工作量。
  • 典型场景:订单通知、实时日志处理、用户行为分析等对实时性要求高的场景。

2、SimpleConsumer(简易拉取消费者)

  • 核心机制:提供更基础的拉取接口(如 pullBlockIfNotFound),开发者需手动控制拉取频率、消息过滤和消费状态。
  • 关键优势:
    • 灵活性高:可自定义拉取策略(如根据业务负载动态调整拉取频率),适合流量削峰场景。
    • 细粒度控制:可手动管理消费进度(Offset),便于实现复杂的消费逻辑。
  • 典型场景:大数据批量处理、需根据系统负载动态调整消费速度的场景。

3、PullConsumer(拉模式消费者)

  • 核心机制:提供最底层的拉取接口(如 pullKernelImpl),需开发者完全手动实现消息拉取、负载均衡、Offset 管理等逻辑。
  • 关键优势:
    • 极致可控性:适用于对消费流程有极高定制需求的场景(如自定义重试策略、多数据源整合)。
    • 性能优化空间大:可根据业务特性优化拉取参数(如批量大小、超时时间)。
  • 典型场景:离线数据处理、分布式任务调度、需与其他系统深度集成的场景。

选择建议

  • 优先选择 PushConsumer:若业务需求简单、追求开发效率,或需要实时处理消息(如实时通知、日志监控),推荐使用 PushConsumer,其封装的自动化机制可大幅减少开发成本。
  • 考虑 SimpleConsumer:若需要灵活控制拉取频率(如避免高峰期流量冲击),或需自定义消费状态管理(如消费失败后的特殊处理),可选择 SimpleConsumer。
  • 谨慎使用 PullConsumer:若有高度定制化需求(如自定义负载均衡算法、多集群消费协调),或需与其他系统(如大数据平台)深度整合,可使用 PullConsumer,但需注意开发复杂度较高。

更多内容请参考官方文档

2.2.5、消息过滤

Apache RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 Apache RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。

RocketMQ 支持Tag标签过滤和SQL属性过滤:

  • Taag标签过滤:直接通过消息的Tag标签,服务器判断生产者和消费者的Tag是否一致(Topic当然也要一致),来精确匹配要把消息发给哪些消费者。
  • SQL属性过滤:RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置SQL语法的过滤表达式过滤多个属性。

TIP:Tag是一种系统属性,所以SQL过滤方式也兼容Tag标签过滤。在SQL语法中,Tag的属性名称为TAGS。

SQL属性过滤示例:

1、先由生产者设置消息属性,如:

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
msg.putUserProperty("age", "25");         // 数值类型(字符串形式)
msg.putUserProperty("gender", "male");   // 字符串类型
msg.putUserProperty("vip", "true");      // 布尔类型(字符串形式)

2、再由消费者通过subscribe()方法指定 SQL 表达式:

consumer.subscribe("TopicTest", 
    MessageSelector.bySql("age > 18 AND gender = 'male' AND vip = 'true'"));

具体的SQL属性过滤规则可以查看官方文档

2.2.6、消费者负载均衡

RocketMQ通过消费者负载均衡实现消费者在分布式集群环境下的消费机制和策略,提高消费者的并发能力和水平扩展能力。

RocketMQ 的消费者负载均衡核心流程由客户端自动完成,无需用户操作服务端,但分配策略、消费模式、参数调优等关键配置需用户根据业务场景定制。

分配策略:

  • 平均分配策略(AllocateMessageQueueAveragely):将所有 MessageQueue 按消费者数量平均分配,是默认策略。
  • 环形分配策略(AllocateMessageQueueAveragelyByCircle):轮询分配 MessageQueue 给消费者。
  • 机房感知分配策略(AllocateMessageQueueByMachineRoom):可配置只消费指定机房的 MessageQueue。
  • 一致性哈希分配策略(AllocateMessageQueueConsistentHash):使用一致性哈希算法分配,降低节点变化时的影响。

示例:

public DefaultMQPushConsumer createAverageAllocateConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("average-allocate-group");
        consumer.setNamesrvAddr("localhost:9876");
        // 设置平均分配策略(默认策略)
        consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
        return consumer;
    }

2.2.7、消费模式

RocketMQ支持两种消费模式:

  • 集群消费(Clustering):同一 Consumer Group 中的每个 Consumer 实例平均分摊消费消息。每条消息只会被 Consumer Group 中的一个 Consumer 实例消费。
  • 广播消费(Broadcasting):每条消息会被 Consumer Group 中的每个 Consumer 实例消费一次。Consumer Group 中的每个 Consumer 实例都获取到 Topic 的所有 MessageQueue 进行消费。

代码示例:

		// 显式设置集群消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式(默认)
        // 显式设置广播消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式

关键配置参数

  • 消费者并行度相关:
    • consumeThreadMin/consumeThreadMax:消费线程池大小,默认 10-20。
    • pullThresholdForQueue:单个 MessageQueue 最大缓存消息数,默认 1000。
  • 消费重试相关:
    • reconsumeTimes:消息重试次数,默认 16 次。
    • suspendCurrentQueueTimeMillis:失败消息暂停消费时间,默认 1000 毫秒。
  • 负载均衡相关:
    • pullInterval:拉取消息间隔,默认 0 表示立即拉取。
    • adjustThreadPoolNumsThreshold:调整线程池大小的阈值,默认 100000。

可以查看更多官方文档内容。

2.2.8、消费进度管理

RocketMQ的消费进度管理机制解决消费者客户端启动后从哪里开始消费,如何标记已消费的消息等问题。

Apache RocketMQ 通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,Apache RocketMQ 会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。消费点位是由RocketMQ服务端的存储实现,与消费者无关,支持跨消费者的消费进度回复。

RocketMQ 定义队列中最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)。虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限, Apache RocketMQ 会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

若消费者分组的初始消费位点或当前消费位点不符合您的业务预期,您可以通过重置消费位点调整您的消费进度。

重置消费位点场景:

  • 初始消费位点不符合需求:因初始消费位点为当前队列的最大消息位点,即客户端会直接从最新消息开始消费。若业务上线时需要消费部分历史消息,您可以通过重置消费位点功能消费到指定时刻前的消息。
  • 消费堆积快速清理:当下游消费系统性能不足或消费速度小于生产速度时,会产生大量堆积消息。若这部分堆积消息可以丢弃,您可以通过重置消费位点快速将消费位点更新到指定位置,绕过这部分堆积的消息,减少下游处理压力。
  • 业务回溯,纠正处理:由于业务消费逻辑出现异常,消息被错误处理。若您希望重新消费这些已被处理的消息,可以通过重置消费位点快速将消费位点更新到历史指定位置,实现消费回溯。

重置消费位点可以通过客户端代码中设置重置消费位点,也可以通过命令行和图形化界面RocketMQ Dashboard进行重置。

更多功能特性内容,如消费重试、消息存储和清理机制请查看官方文档

 

三、部署和运维

 

3.1、RocketMQ的三大组件

NameServer

  • 介绍:承担服务发现和负载均衡的职责。Broker 会向 NameServer 注册自身服务的 Topic 等信息,并通过心跳维持租约。生产者和消费者通过 NameServer 获取 Topic 的路由信息,包括数据分片和服务地址,进而与消息服务器建立连接进行消息收发。
  • 特点:采用 AP 模式,节点无状态,是 ShareNothing 架构,具有更高的可用性。

Proxy

  • 介绍:5.0版本新增的组件,引入了弹性无状态的代理模式(Proxy),将客户端协议适配、权限管理、消费管理等计算逻辑从 Broker 中抽离出来。Proxy 作为无状态的中间层,接收客户端的请求,并根据从 NameServer 获取的路由信息,将请求转发到相应的 Broker 上,同时对消息进行一些上层业务逻辑的处理,如协议转换等。
  • 特点:可根据业务负载分离部署,独立弹性伸缩,与存储流量扩缩容解耦,例如在物联网场景中,能面向海量物联网设备连接数进行弹性伸缩。

Broker

  • 介绍:是消息系统数据链路的核心,负责接收消息、存储消息、维护消息状态和消费者状态。多个 Broker 组成消息服务集群,共同服务一个或多个 Topic。在 RocketMQ 5.0 中,Broker 专注于存储能力的持续优化,将客户端协议适配、权限管理、消费管理等计算逻辑抽离到Proxy组件中。
  • 特点:RocketMQ 5.0 引入 BrokerContainer 概念,一个 BrokerContainer 中可部署多个功能独立的 Broker,共享节点资源,提高利用率,还可通过 Master/Slave 交叉部署实现节点对等,提升高可用性。

3.2、部署方式

在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。

  • 在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
  • 在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。

具体的部署方式可以参考官方文档,已经写的很详细了。

3.3、运维方式

图形化管理工具RocketMQ Dashboard是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。

官方的介绍也很详细,怎么用看一下也就明白了。

 

 

 

全部评论