引言
先前我们讲过了RocketMQ是如何解决顺序消息问题的,今天我们讲一下RocketMQ是如何解决消息堆积问题的。每种MQ解决消息堆积问题的策略可能都不一样,但发生消息堆积问题的原因大致一样,即生产者生产消息的速率与消费者消费消息的速率不匹配,导致消息大量堆积在队列中。
调整消费者线程池大小🤓
通过增加消费者的并发度,可以加快消息的消费速度。这可以通过修改消费者配置中的线程池大小参数实现,通过增加线程池中的线程数,可以让消费者并行处理更多的消息,从而加快整体的消费速度。
RocketMQ 的 Java 客户端中,使用 MessageListenerConcurrently
接口监听消息,其底层是线程池驱动的。我们可以通过设置 consumeThreadMin
和 consumeThreadMax
来控制线程池的最小和最大线程数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class ConsumerExample { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64);
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println("收到消息:" + new String(msg.getBody())); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
consumer.start();
System.out.println("消费者已启动"); } }
|
需要注意的点:
- 如果你的消息处理逻辑比较轻量(如只是记录日志),可以保持默认。
- 如果消息处理比较重(比如涉及数据库操作、远程调用等),则应该适当增大线程池。
- 注意不要超过系统资源限制(CPU、内存、网络带宽等),否则可能引发资源竞争或OOM问题。
增加消费者实例数量😶🌫️
既然生产者生产速率过快,那我们就提高消费者的消费速率,来匹配生产者的生产速率。这个通常是解决消息堆积最简单粗暴的方法,也就是水平扩展增加消费者实例数量,大白话来说就是加机器。
增加消费者实例的数量,以便能够并行处理更多的消息。确保消费者组(Consumer Group)下的消费者实例数量不超过主题(Topic)的队列数量,以避免资源浪费。
检查消费端消费逻辑😣
假如线上检测到消费者端消费速率过慢,第一时间不应该想到去加机器,而是应该先去想到是不是消费者消费出错了,导致过多的消息重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class OrderConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { processOrder(msg); } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
consumer.start();
System.out.println("消费者已启动"); }
private static void processOrder(MessageExt msg) throws Exception { Thread.sleep(500); if (Math.random() < 0.1) { throw new RuntimeException("模拟处理失败"); } } }
|
异常处理不当
在上面的代码中,如果 processOrder
方法抛出异常,当前实现会直接返回 RECONSUME_LATER
,导致消息不断重试。这可能会导致性能下降和资源浪费。
改进方法:
- 添加重试次数限制,避免无限重试。
- 将频繁失败的消息转移到死信队列进行后续处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { int reconsumeTimes = msg.getReconsumeTimes(); if (reconsumeTimes >= 3) { System.out.println("消息重试超过3次,放入死信队列:" + new String(msg.getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } try { processOrder(msg); } catch (Exception e) { System.out.println("消息处理失败,将稍后重试:" + new String(msg.getBody())); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
|
业务处理耗时过长
如果 processOrder
方法中的业务处理逻辑非常耗时,会导致消费者无法及时处理新的消息,从而造成消息堆积。
改进方法:
- 优化业务处理逻辑,减少不必要的计算和IO操作。
- 使用异步处理方式,将耗时操作放到单独的线程池中执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private static ExecutorService executor = Executors.newFixedThreadPool(10);
private static void processOrderAsync(MessageExt msg) { executor.submit(() -> { try { processOrder(msg); } catch (Exception e) { System.out.println("异步处理失败:" + new String(msg.getBody())); } }); }
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { processOrderAsync(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
|
批量消费🥱
批量消费消息 是一种非常有效的优化手段,特别适用于处理大量消息堆积的场景。通过批量消费,消费者可以在一次拉取中获取多条消息,并以批次为单位进行处理,从而减少网络开销和提高整体吞吐量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class BatchConsumerExample { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("开始消费一批消息,共 " + msgs.size() + " 条");
for (MessageExt msg : msgs) { System.out.println("收到订单消息:" + new String(msg.getBody())); try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } }
System.out.println("本批次消息处理完成"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
consumer.start();
System.out.println("消费者已启动,准备批量消费消息..."); } }
|
consumeMessageBatchMaxSize
控制的是实际传给业务逻辑的批次大小。
pullBatchSize
控制的是从 Broker 拉取的消息数,应大于等于 consumeMessageBatchMaxSize
。
总结❤️
解决消息堆积问题常用方法就是上面说的这些啦,当然从RocketMQ仪表盘监测消息消费情况也是很重要的
如果你看了这篇文章有收获可以点赞+关注+收藏🤩,这是对笔者更新的最大鼓励!如果你有更多方案或者文章中有错漏之处,请在评论区提出帮助笔者勘误,祝你拿到更好的offer!