欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ异常处理方案设计

程序员文章站 2022-07-15 13:06:56
...

 

导语:根据业务给MQ异常处理设置优先级:如低、中等、紧急,当MQ发生异常时通过告警邮件通知和记录到数据库中,对于低和中等的异常采用定时任务轮询去重新投递,紧急的异常例如订单支付等则需要开发者尽快去手动处理最佳。对于MQ中发生的异常有以下三种: confirm异常、returnCallBack异常、队列监听消费异常,在此次实际项目中有监控模块(死信队列的监控,根据业务类型发送告警邮件;是否将异常写入数据库等待定时任务重新投递)和定时任务(创建重新投递任务,告知监控模块投递触发时机)模块协同处理,以下只是目前所想的方案,其实有着更多的可靠的方案吧,您怎么看?

confirm异常

  • confirm确认消息是否投递到MQ服务器。处理逻辑:当未投递成功时,发送到告警队列,发送报警邮件。

returnCallBack异常

  • 消息投递到队列发生异常时回调(当消息在交换机根据routeKey找不到投递的队列时发生异常),处理逻辑:发送报警邮件,异常写入数据库; sendErrorMessage()方法中,消息内容message优先级设置为低,根据业务需求设置

队列监听消费异常

  • 队列初始化配置死信队列
  • 消费的确认机制为手动确认,队列监听消费处理业务代码块做try/catch处理,发生异常nack,拒绝重新入队
  • 消费监听发生异常投递重新投递死信队列
  • 业务队列发生异常设置处理优先级为中等,订单支付设置为紧急,根据业务需求设置,处理逻辑:发送报警邮件,异常写入数据库。
  • MQ发送消息时需要给消息设置请求头属性,包含当前投递exchange、routeKey和异常处理优先级

定时任务

  • 通过定时任务轮询表status 1-异常 和优先级为(0-低 1-中等)的数据,重新投递到原始队列 (在原始监听队列 消费成功逻辑中补充,将消息标志id的status设置为2-已解决);
  • 重新投递需要知道消息绑定的原始exchange和routeKey, rabbitListenerErrorHandler回调时取不到相关信息,因此在发送消息时在请求头携带;根据业务在MQ发送消息时指定消息的优先级(0-低 1-中等 2-紧急);   
  • status1-异常和优先级为紧急(订单支付等业务)的异常数据手动处理;
  • 重新投递给消息加上消息头属性设置是否重新投递标志,用于MQ监听消费后将数据库该消息ID的修复状态设置为2-已解决

部分实现

  • 通发送消息时header设置当前exchenge等属性:
     CorrelationData correlationData = new CorrelationData(messageId);
            rabbitTemplate.convertAndSend(exchange, routeKey, MessageHelper.objToMsg(message.getContent()), message1 -> {
                message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.HEAD_EXCHANGE, exchange);
                message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.HEAD_ROUTE, routeKey);
                message1.getMessageProperties().setPriority(priority.intValue());
                message1.getMessageProperties().setHeader(MqConstants.MessageHeadProperties.IS_RETRY_DELIVER, true);
                return message1;
            }, correlationData);

     

  • 监听消费确认:
        @RabbitListener(queues = MqConstants.QaInfo.QA_INFO_TIMEOUT_QUEUE_NAME)
        public void consume(Message message, Channel channel,
                            @Header(value = MqConstants.MQ_HEADER_CORRELATION_ID_KEY, required = false) String correlationId) {
            if (this.lockToConsume(correlationId)) {
                Boolean flag = true;
                try {
                    consumer.consume(message);
                } catch (Exception e) {
                    flag = false;
                    e.printStackTrace();
                } finally {
                    if (flag) {
                        this.consumeConfirm(channel, correlationId, message.getMessageProperties().getDeliveryTag());
                        String status = message.getMessageProperties().getHeader(MqConstants.MessageHeadProperties.IS_RETRY_DELIVER);
                        if (StringUtils.isNotBlank(status) && Boolean.parseBoolean(status)) {
                            //将数据库记录设置为已解决,此处需要确保代码处理无异常,省略,
    
                        }
                    } else {
                        this.nack(channel, correlationId, message.getMessageProperties().getDeliveryTag());
                    }
                }
            } else {
                log.warn("[MQ]消息重复消费, ud: {}", correlationId);
                this.nack(channel, correlationId, message.getMessageProperties().getDeliveryTag());
            }
        }

     

 

 

相关标签: rabbitmq