概述

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

image-1655731345514

模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下:

  • 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
  • 在consumer服务中定义两个消息监听者,都监听simple.queue队列
  • 消费者1每秒处理50条消息,消费者2每秒处理10条消息

image-1655731398242

循环生产消息


    @Test
    public void testSend50Message() {
        String queueName = "simplequeue";
        String message = "hello ,Spring amqp---";
        for (int i = 0; i < 50; i++) {
            template.convertAndSend(queueName, message + i);
        }
    }

两个消费能力不同的消费者共同消费

此时需要注意,需要将我们之前创建的监听方法去掉,以免被其他的消费者消费

//交给spring管理
@Component
public class SpringRabbitListener {

    //监听simplequeue队列,队列中有消息就接受到,输出
//    @RabbitListener(queues = "simplequeue")
//    public void listenSimpleQueue(String msg) {
//        System.out.println("【接受到消息】:" + msg);
//    }

    @RabbitListener(queues = "simplequeue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("【消费者1接受到消息】:" + msg + "--当前时间:" + LocalTime.now());
        Thread.sleep(20);//模拟消费者能力,每秒钟消费50条--->20 x 50 = 1000 ms
    }

    @RabbitListener(queues = "simplequeue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("【消费者2接受到消息】:" + msg + "--当前时间:" + LocalTime.now());
        Thread.sleep(200);//模拟消费者能力,每秒钟消费5条--->200 x 5 = 1000 ms
    }
}

由理论可知,消费者1的消费速度为每秒50条,消费者2的消费速度为每秒5条,生产者以一秒内生产了50条消息,按道理讲,消息的消费应该在一秒内完成,但是现在却远远超过了一秒,而且还发现,消费者1处理的消息的偶数标记的,消费者2则是奇数的,可推断,两个消费者平均消费了这50条消息,消费者1早早的消费完了自己的偶数25条,剩下的奇数25条,消费者2要通过5秒才能消费完,所以才出现了以下途中的情况,看来,就默认的工作队列情况下,rabbitMQ并没有按照消费者的能力来分发消息,这种情况是由于消费者的消息预取造成的,当队列中有消息时,消费者们先去队列中你一条消息,我们一条消息的取过来,然后自己在慢慢消费。

image-1655732284035

问题解决

就以上问题,在实际的情况在肯定是不可取的,没有处理消费的能力,却还是能取到和处理能力很强的消费者同样数量的消息,不合适,没那金刚钻,就别揽瓷器活。

由rabbitMQ的控制台可知,SpringAMQP的链接通道的预取数量为:250

image-1655733050380

怎么处理呢?其实在rabbitMQ的配置中配置一下预取的数量就好了,配置为1,每次只取一条消息,处理完了,再来取。

application.yml
spring:
  rabbitmq:
    host: 47.104.178.202
    port: 5672 #rabbitMq的端口,并非15672
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        prefetch: 1
再次测试

可以发现,现在处理消息的情况就符合预期了,消费者2处理能力低,就少消费点,消费者1处理能力强,就对消费点:

image-1655732913815

同时,在rabbitMQ控制台的对于通道链接的预取数量也发生了变化:

image-1655733134345

总结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,避免消费者能力不行,消息发生堆积