您的位置:首页 > 文旅 > 美景 > RabbitMq 延迟队列

RabbitMq 延迟队列

2024/9/16 20:53:02 来源:https://blog.csdn.net/WindwirdBird/article/details/139727872  浏览:    关键词:RabbitMq 延迟队列

前言

延迟消息队列在我们工作中使用的场景特别多,比如超时未支付取消订单,异步业务时间有时间间隔,等等,今天我们就来聊一聊使用消息延迟队列

需求

使用RabbitMq  实现延迟队列,5分钟之后进行消息的消费

方式

  1. 基于死信队列
  2. 基于插件 (如果支持可以用,如果没有建议第一种,怕引入插件产生版本冲突)

实现

 今天就基于死信队列咱们玩一下
原理图

delay impl

代码实现
 import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wind* @example ${bizName} 自己的业务名称,记得替换   eg: pay_cancel*  使用时同一替换 ${bizName}* @des :*    1. 准备两个普通队列*    2. 其中的一个队列构造 是 自己的queuename + ttl + deadExchange + deadRoutingKey  (core )*/
@Configuration
public class ${bizName}DelayConfig {public  static final String ${bizName}_DELAY_EXCHANGE_NAME =${bizName}+"_delay_exchange";public  static final String ${bizName}_DELAY_QUEUE_NAME =${bizName}+"r_delay_queue";public  static final String ${bizName}_DELAY_ROUTING_KEY =${bizName}+"_delay_routing_key";public  static final String ${bizName}_DEAD_EXCHANGE_NAME =${bizName}+"_dead_exchange";public  static final String ${bizName}_DEAD_QUEUE_NAME =${bizName}+"dead_queue";public  static final String ${bizName}_DEAD_ROUTING_KEY =${bizName}+"_dead_routing_key";/*** 核心  和死信队列建立关键的核心*/@BeanQueue delayedQueue() {return QueueBuilder.durable(${bizName}_DELAY_QUEUE_NAME).ttl(300000).deadLetterExchange(${bizName}_DEAD_EXCHANGE_NAME).deadLetterRoutingKey(${bizName}_DEAD_ROUTING_KEY).build();}@Beanpublic DirectExchange delayedExchange(){return  new DirectExchange(${bizName}_DELAY_EXCHANGE_NAME);}@BeanBinding delayedBinding(Queue delayedQueue,DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(${bizName}_DELAY_ROUTING_KEY);}@Beanpublic Queue deadQueueOrder() {return QueueBuilder.durable(${bizName}_DEAD_QUEUE_NAME).deadLetterExchange(${bizName}_DEAD_EXCHANGE_NAME).deadLetterRoutingKey(${bizName}_DEAD_ROUTING_KEY).build();}@Beanpublic DirectExchange deadExchangeOrder() {return new DirectExchange(${bizName}_DEAD_EXCHANGE_NAME);}@Beanpublic Binding bindingDeadExchange(Queue deadQueueOrder,DirectExchange deadExchangeOrder) {return BindingBuilder.bind(deadQueueOrder).to(deadExchangeOrder).with(${bizName}_DEAD_ROUTING_KEY);}}监听:
/*** @author wind* @des  监听的是死信队列  ttl 过后进行消费**/
@Component
@Slf4j
@RequiredArgsConstructor
public class ${bizName}DelayRefundMqListener {@RabbitListener(queues=${bizName}DelayConfigDelayConfig.${bizName}_DEAD_QUEUE_NAME)public void rechargeResult(String msg, Channel channel, Message message) {try {log.info("业务消息进来 rechargeResultMQMessage:{}", msg);处理业务逻辑} finally {try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error("充值退款延迟消息 确认消息失败");}}}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com