记录生产环境Rabbitmq消息积压的问题
          目录
          
        
        
      记录生产环境Rabbitmq消息积压的问题
场景:下游所有的子系统需要上上游中心推送消息,1秒至少有10条消息推送过来,由于之前的开发人员在处理这个地方的逻辑的时候,消息量比较少,所以采用的单线程消费。导致消息处理不及时造成了消息积压
看一下处理前日志截图和Rabbitmq的队列消息数量

优化思路
- 
由于 springboot中rabbitmq默认是单线程监听,所以需要我们加以配置,将rabbitmq单线程消费改为多线程消费 
- 
将每个线程接收的 rabbitmq消息放入环形队列中,异步处理,这里采用的是disruptor框架
改在Rabbitmq为多线程方式
配置rabbitmq多线程监听,提供SimpleRabbitListenerContainerFactory类的bean,为数据量大的队列指定这个bean。
设置多线程批量处理。在任意配置类中,提供这个bean。(注意配置类中的Listener会覆盖配置文件中的配置)
@Bean("batchQueueRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    //设置线程数
    factory.setConnectionFactory(connectionFactory);
    //最大线程数
    factory.setConcurrentConsumers(10);
    factory.setMaxConcurrentConsumers(10);
    return factory;
}在消费的地方运用 batchQueueRabbitListenerContainerFactory bean
@RabbitListener(queues = "alarmQueue", containerFactory = "batchQueueRabbitListenerContainerFactory")
public void saveAlgorithmAlarm(String message) {
  // todo
}应用disruptor框架处理业务
引入依赖
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>编写业务BO类
@Data
public class RabbitMqContextBO {
  
  // 定义上下文字段
  private String filed;
  
}创建RabbitMqContextBO类创建工厂类
public class RabbitMqContextBOFactory implements EventFactory<RabbitMqContextBO> {
  @Override
  public RabbitMqContextBO newInstance() {
    return new RabbitMqContextBO();
  }
}创建一个消息消费者
@Slf4j
@component
public class RabbitMqEventProducer {
  
  private final RingBuffer<RabbitMqContextBO> ringBuffer;
  
  public RabbitMqEventProducer(@Qualifier("ringBuffer") RingBuffer<RabbitMqContextBO> ringBuffer) {
    this.ringBuffer = ringBuffer;
  } 
  
  public void commit(String message){
    long sequence = ringBuffer.next();
    try{
      // 转换数据对象
      RabbitMqContextBO rabbitMqContextBO = ringBuffer.get(sequence);
      rabbitMqContextBO.setFiled(message);
    }catch (Exception e){
      log.error("消息处理失败:", e);
    }finally {
      ringBuffer.publish(sequence);
    }
  }
}创建业务处理handler,这里使用业务1,2,3代替
RabbitMqEventHandle1
@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle1 implements EventHandler<RabbitMqContextBO> {
  
  @Override
  public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
    // TODO 具体业务逻辑
    log.info("RabbitMqEventHandle1");
  }
  
}RabbitMqEventHandle2
@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle2 implements EventHandler<RabbitMqContextBO> {
  
  @Override
  public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
    // TODO 具体业务逻辑
    log.info("RabbitMqEventHandle2");
  }
  
}RabbitMqEventHandle3
@Slf4j
@RequiredArgsConstructor
public class RabbitMqEventHandle3 implements EventHandler<RabbitMqContextBO> {
  
  @Override
  public void onEvent(RabbitMqContextBO rabbitMqContextBO, long l, boolean b) throws Exception {
    // TODO 具体业务逻辑
    log.info("RabbitMqEventHandle3");
  }
  
}创建RingBuffer并加入spring 容器,并启动消费者
@Bean("ringBuffer")
public RingBuffer<RabbitMqContextBO> ringBuffer(){
  RabbitMqContextBOFactory factory = new RabbitMqContextBOFactory();
  ThreadFactory threadFactory = Executors.defaultThreadFactory();
  // 等待策略
  SleepingWaitStrategy waitStrategy = new SleepingWaitStrategy();
  Disruptor<RabbitMqContextBO> disruptor = new Disruptor<>(factory, 1024 * 4, threadFactory, ProducerType.SINGLE, waitStrategy);
  // 业务编排
  disruptor
    .handleEventsWith(new RabbitMqEventHandle1())
    .handleEventsWith(new RabbitMqEventHandle2())
    .handleEventsWith(new RabbitMqEventHandle3());
  // 启动消费者
  disruptor.start();
  return disruptor.getRingBuffer();
}在Rabbitmq消费消息的时候将具体的业务逻辑交给handler处理即可
@RabbitListener(queues = "alarmQueue", containerFactory = "batchQueueRabbitListenerContainerFactory")
public void saveAlgorithmAlarm(String message) {
  // todo
  alarmEventProducer.commit(message);
}重新查看改造后的rabbitmq web manager

查看日志线程名称:

上面的代码是改造过后的逻辑代码,和图片中的类名不相同
