一站式了解RocketMQ如何解决消息堆积问题

引言

先前我们讲过了RocketMQ是如何解决顺序消息问题的,今天我们讲一下RocketMQ是如何解决消息堆积问题的。每种MQ解决消息堆积问题的策略可能都不一样,但发生消息堆积问题的原因大致一样,即生产者生产消息的速率与消费者消费消息的速率不匹配,导致消息大量堆积在队列中。

调整消费者线程池大小🤓

通过增加消费者的并发度,可以加快消息的消费速度。这可以通过修改消费者配置中的线程池大小参数实现,通过增加线程池中的线程数,可以让消费者并行处理更多的消息,从而加快整体的消费速度。

RocketMQ 的 Java 客户端中,使用 MessageListenerConcurrently 接口监听消息,其底层是线程池驱动的。我们可以通过设置 consumeThreadMinconsumeThreadMax 来控制线程池的最小和最大线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ConsumerExample {
public static void main(String[] args) throws MQClientException {
// 创建消费者实例,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_consumer_group");

// 设置 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");

// 设置线程池参数:最小线程数和最大线程数
consumer.setConsumeThreadMin(20); // 默认是 10
consumer.setConsumeThreadMax(64); // 默认是 64

// 订阅 Topic 和 Tag(* 表示订阅所有 Tag)
consumer.subscribe("TestTopic", "*");

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到消息:" + new String(msg.getBody()));
// 模拟业务处理耗时
try {
Thread.sleep(100); // 假设每条消息处理需要 100ms
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

System.out.println("消费者已启动");
}
}

需要注意的点:

  • 如果你的消息处理逻辑比较轻量(如只是记录日志),可以保持默认。
  • 如果消息处理比较重(比如涉及数据库操作、远程调用等),则应该适当增大线程池
  • 注意不要超过系统资源限制(CPU、内存、网络带宽等),否则可能引发资源竞争或OOM问题。

增加消费者实例数量😶‍🌫️

既然生产者生产速率过快,那我们就提高消费者的消费速率,来匹配生产者的生产速率。这个通常是解决消息堆积最简单粗暴的方法,也就是水平扩展增加消费者实例数量,大白话来说就是加机器。

增加消费者实例的数量,以便能够并行处理更多的消息。确保消费者组(Consumer Group)下的消费者实例数量不超过主题(Topic)的队列数量,以避免资源浪费。

检查消费端消费逻辑😣

假如线上检测到消费者端消费速率过慢,第一时间不应该想到去加机器,而是应该先去想到是不是消费者消费出错了,导致过多的消息重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class OrderConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");

// 设置 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");

// 订阅 Topic 和 Tag(* 表示订阅所有 Tag)
consumer.subscribe("OrderTopic", "*");

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 模拟业务处理逻辑
processOrder(msg);
} catch (Exception e) {
// 如果处理失败,返回 CONSUME_SUCCESS 会导致消息被丢弃
// 应该返回 RECONSUME_LATER 让消息稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

System.out.println("消费者已启动");
}

private static void processOrder(MessageExt msg) throws Exception {
// 模拟复杂的业务处理逻辑
Thread.sleep(500); // 假设每条消息处理需要 500ms
if (Math.random() < 0.1) {
throw new RuntimeException("模拟处理失败");
}
}
}

异常处理不当

在上面的代码中,如果 processOrder 方法抛出异常,当前实现会直接返回 RECONSUME_LATER,导致消息不断重试。这可能会导致性能下降和资源浪费。

改进方法:

  • 添加重试次数限制,避免无限重试。
  • 将频繁失败的消息转移到死信队列进行后续处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
int reconsumeTimes = msg.getReconsumeTimes();
if (reconsumeTimes >= 3) { // 重试超过3次,放入死信队列
System.out.println("消息重试超过3次,放入死信队列:" + new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
processOrder(msg);
} catch (Exception e) {
System.out.println("消息处理失败,将稍后重试:" + new String(msg.getBody()));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

业务处理耗时过长

如果 processOrder 方法中的业务处理逻辑非常耗时,会导致消费者无法及时处理新的消息,从而造成消息堆积。

改进方法:

  • 优化业务处理逻辑,减少不必要的计算和IO操作。
  • 使用异步处理方式,将耗时操作放到单独的线程池中执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static ExecutorService executor = Executors.newFixedThreadPool(10);

private static void processOrderAsync(MessageExt msg) {
executor.submit(() -> {
try {
processOrder(msg);
} catch (Exception e) {
System.out.println("异步处理失败:" + new String(msg.getBody()));
}
});
}

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
processOrderAsync(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

批量消费🥱

批量消费消息 是一种非常有效的优化手段,特别适用于处理大量消息堆积的场景。通过批量消费,消费者可以在一次拉取中获取多条消息,并以批次为单位进行处理,从而减少网络开销和提高整体吞吐量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class BatchConsumerExample {
public static void main(String[] args) throws MQClientException {
// 创建消费者实例,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");

// 设置 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");

// 设置每次最多消费的消息数量(默认是 1)
consumer.setConsumeMessageBatchMaxSize(10);

// 订阅 Topic 和 Tag(* 表示订阅所有 Tag)
consumer.subscribe("OrderTopic", "*");

// 注册监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("开始消费一批消息,共 " + msgs.size() + " 条");

for (MessageExt msg : msgs) {
System.out.println("收到订单消息:" + new String(msg.getBody()));
// 模拟业务逻辑处理
try {
Thread.sleep(5); // 假设每条消息处理耗时 5ms
} catch (InterruptedException e) {
e.printStackTrace();
}
}

System.out.println("本批次消息处理完成");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

System.out.println("消费者已启动,准备批量消费消息...");
}
}
  • consumeMessageBatchMaxSize 控制的是实际传给业务逻辑的批次大小
  • pullBatchSize 控制的是从 Broker 拉取的消息数,应大于等于 consumeMessageBatchMaxSize

总结❤️

解决消息堆积问题常用方法就是上面说的这些啦,当然从RocketMQ仪表盘监测消息消费情况也是很重要的

如果你看了这篇文章有收获可以点赞+关注+收藏🤩,这是对笔者更新的最大鼓励!如果你有更多方案或者文章中有错漏之处,请在评论区提出帮助笔者勘误,祝你拿到更好的offer!


一站式了解RocketMQ如何解决消息堆积问题
https://maple525866.github.io/2025/06/24/一站式了解RocketMQ如何解决消息堆积问题/
作者
maple
发布于
2025年6月24日
许可协议