5 RabbitMQ
RabbitMQ的三种队列:
- 直连队列模式
- 扇形队列模式
- 主题队列模式
RabbitMQ模块创建:
创建要点:
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
核心配置:
# 服务名(应用名),会在eureka注册中心显示
spring:
application:
name: rabbitmq
# rabbitmq配置
rabbitmq:
host: 139.196.137.209
port: 5672
username: admin
password: admin
config配置:
三种主题都需要单独配置
直连队列模式
@Configuration public class RabbitMQDirectConfig { /** * @author : Lcywings * @date : 2021/9/1 14:01 * @acl : true * @description : 配置智联队列,设置消息持久化 */ @Bean public Queue queue() { // 四个参数:Queue(String name,boolean durable,boolean exclusive,boolean autoDelete,Map<S // String name:队列的名称,此参数是必须的, 创建队列对象必须指定队列的名称 // boolean durable:是否持久化,如果是true,消息会被持久在磁盘上,如果重启服务,消息仍然会在队列中 // boolean exclusive:是否是排他队列,默认是false,如果是ttrue,只有创建当前队列的链接有权访问,当连接断开之后, // boolean autoDelete:是否自动删除队列,默认是false,如果为true,一旦没有生产者或者消费者使用此队列,该队列自动被删除 return new Queue(RabbitMQConstant.DIRECT_QUEUE_NAME, true); } /** * @author : Lcywings * @date : 2021/9/1 14:11 * @acl : true * @description : 直连交换机 */ @Bean public DirectExchange directExchange(){ return new DirectExchange(RabbitMQConstant.DIRECT_EXCHANGE_NAME); } /** * @author : Lcywings * @date : 2021/9/1 14:14 * @acl : true * @description : 直连交换机和直连队列的绑定 */ @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitMQConstant.DIRECT_ROUTING_NAME); } }
扇形队列模式
@Configuration public class RabbitMQFanoutConfig { /** * @author : Lcywings * @date : 2021/9/1 15:13 * @acl : true * @description : 扇形队列,设置持久化 */ @Bean public Queue fanoutQueueOne() { return new Queue(RabbitMQConstant.FANOUT_QUEUE_ONE_NAME, true); } /** * @author : Lcywings * @date : 2021/9/1 15:13 * @acl : true * @description : 扇形队列,设置持久化 */ @Bean public Queue fanoutQueueTwo() { return new Queue(RabbitMQConstant.FANOUT_QUEUE_TWO_NAME, true); } /** * @author : Lcywings * @date : 2021/9/1 15:13 * @acl : true * @description : 扇形队列,设置持久化 */ @Bean public Queue fanoutQueueThree() { return new Queue(RabbitMQConstant.FANOUT_QUEUE_THREE_NAME, true); } /** * @author : Lcywings * @date : 2021/9/1 15:19 * @acl : true * @description : 扇形交换机 */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitMQConstant.FANOUT_EXCHANGE_NAME); } /** * @author : Lcywings * @date : 2021/9/1 15:20 * @acl : true * @description : 扇形FANOUT_EXCHANGE_NAME */ @Bean public Binding bindingOne() { return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange()); } /** * @author : Lcywings * @date : 2021/9/1 15:20 * @acl : true * @description : 扇形FANOUT_EXCHANGE_NAME */ @Bean public Binding bindingTwo() { return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); } /** * @author : Lcywings * @date : 2021/9/1 15:20 * @acl : true * @description : 扇形FANOUT_EXCHANGE_NAME */ @Bean public Binding bindingThree() { return BindingBuilder.bind(fanoutQueueThree()).to(fanoutExchange()); }
主题队列模式
@Configuration public class RabbitMQTopicConfig { /** * @author : Lcywings * @date : 2021/9/1 15:13 * @acl : true * @description : */ @Bean public Queue topicQueueOne() { return new Queue(RabbitMQConstant.TOPIC_QUEUE_ONE_NAME, true); } /** * @author : Lcywings * @date : 2021/9/1 15:13 * @acl : true * @description : */ @Bean public Queue topicQueueTwo() { return new Queue(RabbitMQConstant.TOPIC_QUEUE_TWO_NAME, true); } /** * @author : Lcywings * @date : 2021/9/1 14:11 * @acl : true * @description : 主题交换机 */ @Bean public TopicExchange topicExchange() { return new TopicExchange(RabbitMQConstant.TOPIC_EXCHANGE_NAME); } /** * @author : Lcywings * @date : 2021/9/1 14:14 * @acl : true * @description : 直连交换机和直连队列的绑定 */ @Bean public Binding bindingOne() { return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(RabbitMQConstant.TOPIC_ROUTING_ONLY_NAME); } /** * @author : Lcywings * @date : 2021/9/1 14:14 * @acl : true * @description : 直连交换机和直连队列的绑定 */ @Bean public Binding bindingTwo() { return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(RabbitMQConstant.TOPIC_ROUTING_BATCH_NAME); } }
消费者:
直连队列模式
@Component @Slf4j @RabbitListener(queues = RabbitMQConstant.DIRECT_QUEUE_NAME) public class RabbitMQDirectConsumer { /** * @author : Lcywings * @date : 2021/9/1 14:51 * @acl : true * @description : 从直连队列中消费信息 */ @RabbitHandler public void consumerMsg4DirectQueueOne(Map<String, Object> msg) { log.info("+++++ 直连队列:{},消费消息体:{} ++++++",RabbitMQConstant.DIRECT_QUEUE_NAME,msg); // TODO 从队列中获取到消息后,需要进行后续的业务逻辑处理 } }
扇形队列模式*3
@Component @Slf4j @RabbitListener(queues = RabbitMQConstant.FANOUT_QUEUE_ONE_NAME) public class RabbitMQFanoutConsumerOne { /** * @author : Lcywings * @date : 2021/9/1 14:51 * @acl : true * @description : 从扇形队列中消费信息 */ @RabbitHandler public void consumerMsg4FanoutQueueOne(Map<String, Object> msg) { log.info("+++++ 扇形队列:{},消费消息体:{} ++++++", RabbitMQConstant.FANOUT_QUEUE_ONE_NAME); // TODO 从队列中获取到消息后,需要进行后续的业务逻辑处理 } }
主题队列模式
@Component @Slf4j @RabbitListener(queues = RabbitMQConstant.TOPIC_QUEUE_TWO_NAME) public class RabbitMQTopicConsumerTwo { /** * @author : Lcywings * @date : 2021/9/1 14:51 * @acl : true * @description : 从扇形队列中消费信息 */ @RabbitHandler public void consumerMsg4TopicQueueTwo(Map<String, Object> msg) { log.info("+++++ 主题队列:{},消费消息体:{} ++++++", RabbitMQConstant.TOPIC_QUEUE_TWO_NAME); // TODO 从队列中获取到消息后,需要进行后续的业务逻辑处理 } }
提供者:
直连队列模式
@Component @Slf4j public class RabbitMQDirectProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * @author : Lcywings * @date : 2021/9/1 14:27 * @acl : true * @description : 向指定的直连队列中发送消息 */ public void sendMsg2DirectQueue(String exchangeName, String routingKey, Map<String, Object> msg) { log.info("****** 发送消息体:{},直连交换机:{} ******", msg, exchangeName); // 生成消息发送 rabbitTemplate.convertAndSend(exchangeName, routingKey, msg); } }
扇形队列模式
@Component @Slf4j public class RabbitMQFanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * @author : Lcywings * @date : 2021/9/1 14:27 * @acl : true * @description : 向指定的扇形交换机中发送消息(广播,辐射式进行多个队列发送) */ public void sendMsg2FanoutQueue(String exchangeName, String routingKey, Map<String, Object> msg) { log.info("****** 发送消息体:{},扇形交换机:{} ******", msg, exchangeName); // 生成消息发送 rabbitTemplate.convertAndSend(exchangeName, routingKey, msg); } }
主题队列模式
*/ @Component @Slf4j public class RabbitMQTopicProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * @author : Lcywings * @date : 2021/9/1 14:27 * @acl : true * @description : RabbitMQ主题消息生产者c */ public void sendMsg2TopicQueue(String exchangeName, String routingKey, Map<String, Object> msg) { log.info("****** 发送消息体:{},主题交换机:{} ******", msg, exchangeName); // 生成消息发送 rabbitTemplate.convertAndSend(exchangeName, routingKey, msg); } }
常量类:
/** * 直连Direct:队列名称 */ public static final String DIRECT_QUEUE_NAME = "rabbit-direct-queue-test"; /** * 直连Direct:交换机名称 */ public static final String DIRECT_EXCHANGE_NAME = "rabbit-direct-exchange-test"; /** * 直连Direct:路由键-队列和交换机的绑定标识 */ public static final String DIRECT_ROUTING_NAME = "rabbit-direct-routing-test"; /** * 扇形Fanout:队列名称one */ public static final String FANOUT_QUEUE_ONE_NAME = "rabbit-fanout-queue-one-test"; /** * 扇形Fanout:队列名称two */ public static final String FANOUT_QUEUE_TWO_NAME = "rabbit-fanout-queue-two-test"; /** * 扇形Fanout:队列名称three */ public static final String FANOUT_QUEUE_THREE_NAME = "rabbit-fanout-queue-three-test"; /** * 扇形Fanout:交换机名称 */ public static final String FANOUT_EXCHANGE_NAME = "rabbit-fanout-exchange-test"; /** * 主题topic:队列名称one */ public static final String TOPIC_QUEUE_ONE_NAME = "rabbit-topic-queue-one-test"; /** * 主题topic:队列名称one */ public static final String TOPIC_QUEUE_TWO_NAME = "rabbit-topic-queue-two-test"; /** * 主题Topic:交换机名称 */ public static final String TOPIC_EXCHANGE_NAME = "rabbit-topic-fanout-test"; /** * 主题Topic:唯一匹配路由键-队列和交换机的绑定标识 */ public static final String TOPIC_ROUTING_ONLY_NAME = "rabbit-topic-routing-test.only"; /** * 主题Topic:批量匹配路由键-队列和交换机的绑定标识 */ public static final String TOPIC_ROUTING_BATCH_NAME = "rabbit-topic-routing-test.#";
Q.E.D.