5 RabbitMQ


RabbitMQ的三种队列:

  1. 直连队列模式
  2. 扇形队列模式
  3. 主题队列模式

RabbitMQ模块创建:

创建要点:

依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

核心配置:

# 服务名(应用名),会在eureka注册中心显示
spring:
  application:
    name: rabbitmq
    # rabbitmq配置
  rabbitmq:
    host: 139.196.137.209
    port: 5672
    username: admin
    password: admin

config配置:

三种主题都需要单独配置

直连队列模式

@Configuration
public class RabbitMQDirectConfig {
    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:01
     * @acl : true
     * @description : 配置智联队列,设置消息持久化
     */
    @Bean
    public Queue queue() {
        // 四个参数:Queue(String name,boolean durable,boolean exclusive,boolean autoDelete,Map<S
        // String name:队列的名称,此参数是必须的, 创建队列对象必须指定队列的名称
        // boolean durable:是否持久化,如果是true,消息会被持久在磁盘上,如果重启服务,消息仍然会在队列中
        // boolean exclusive:是否是排他队列,默认是false,如果是ttrue,只有创建当前队列的链接有权访问,当连接断开之后,
        // boolean autoDelete:是否自动删除队列,默认是false,如果为true,一旦没有生产者或者消费者使用此队列,该队列自动被删除
        return new Queue(RabbitMQConstant.DIRECT_QUEUE_NAME, true);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:11
     * @acl : true
     * @description : 直连交换机
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(RabbitMQConstant.DIRECT_EXCHANGE_NAME);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:14
     * @acl : true
     * @description : 直连交换机和直连队列的绑定
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitMQConstant.DIRECT_ROUTING_NAME);
    }

}

扇形队列模式

@Configuration
public class RabbitMQFanoutConfig {

    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:13
     * @acl : true
     * @description : 扇形队列,设置持久化
     */
    @Bean
    public Queue fanoutQueueOne() {
        return new Queue(RabbitMQConstant.FANOUT_QUEUE_ONE_NAME, true);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:13
     * @acl : true
     * @description : 扇形队列,设置持久化
     */
    @Bean
    public Queue fanoutQueueTwo() {
        return new Queue(RabbitMQConstant.FANOUT_QUEUE_TWO_NAME, true);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:13
     * @acl : true
     * @description : 扇形队列,设置持久化
     */
    @Bean
    public Queue fanoutQueueThree() {
        return new Queue(RabbitMQConstant.FANOUT_QUEUE_THREE_NAME, true);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:19
     * @acl : true
     * @description : 扇形交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(RabbitMQConstant.FANOUT_EXCHANGE_NAME);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:20
     * @acl : true
     * @description : 扇形FANOUT_EXCHANGE_NAME
     */
    @Bean
    public Binding bindingOne() {
        return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
    }
    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:20
     * @acl : true
     * @description : 扇形FANOUT_EXCHANGE_NAME
     */
    @Bean
    public Binding bindingTwo() {
        return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
    }
    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:20
     * @acl : true
     * @description : 扇形FANOUT_EXCHANGE_NAME
     */
    @Bean
    public Binding bindingThree() {
        return BindingBuilder.bind(fanoutQueueThree()).to(fanoutExchange());
    }

主题队列模式

@Configuration
public class RabbitMQTopicConfig {

    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:13
     * @acl : true
     * @description :
     */
    @Bean
    public Queue topicQueueOne() {
        return new Queue(RabbitMQConstant.TOPIC_QUEUE_ONE_NAME, true);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 15:13
     * @acl : true
     * @description :
     */
    @Bean
    public Queue topicQueueTwo() {
        return new Queue(RabbitMQConstant.TOPIC_QUEUE_TWO_NAME, true);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:11
     * @acl : true
     * @description : 主题交换机
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(RabbitMQConstant.TOPIC_EXCHANGE_NAME);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:14
     * @acl : true
     * @description : 直连交换机和直连队列的绑定
     */
    @Bean
    public Binding bindingOne() {
        return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(RabbitMQConstant.TOPIC_ROUTING_ONLY_NAME);
    }

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:14
     * @acl : true
     * @description : 直连交换机和直连队列的绑定
     */
    @Bean
    public Binding bindingTwo() {
        return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(RabbitMQConstant.TOPIC_ROUTING_BATCH_NAME);
    }

}

消费者:

直连队列模式

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConstant.DIRECT_QUEUE_NAME)
public class RabbitMQDirectConsumer {
    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:51
     * @acl : true
     * @description : 从直连队列中消费信息
     */
    @RabbitHandler
    public void consumerMsg4DirectQueueOne(Map<String, Object> msg) {
        log.info("+++++ 直连队列:{},消费消息体:{} ++++++",RabbitMQConstant.DIRECT_QUEUE_NAME,msg);

        // TODO 从队列中获取到消息后,需要进行后续的业务逻辑处理
    }
}

扇形队列模式*3

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConstant.FANOUT_QUEUE_ONE_NAME)
public class RabbitMQFanoutConsumerOne {
    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:51
     * @acl : true
     * @description : 从扇形队列中消费信息
     */
    @RabbitHandler
    public void consumerMsg4FanoutQueueOne(Map<String, Object> msg) {
        log.info("+++++ 扇形队列:{},消费消息体:{} ++++++", RabbitMQConstant.FANOUT_QUEUE_ONE_NAME);

        // TODO 从队列中获取到消息后,需要进行后续的业务逻辑处理
    }
}

主题队列模式

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConstant.TOPIC_QUEUE_TWO_NAME)
public class RabbitMQTopicConsumerTwo {
    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:51
     * @acl : true
     * @description : 从扇形队列中消费信息
     */
    @RabbitHandler
    public void consumerMsg4TopicQueueTwo(Map<String, Object> msg) {
        log.info("+++++ 主题队列:{},消费消息体:{} ++++++", RabbitMQConstant.TOPIC_QUEUE_TWO_NAME);

        // TODO 从队列中获取到消息后,需要进行后续的业务逻辑处理
    }
}

提供者:

直连队列模式

@Component
@Slf4j
public class RabbitMQDirectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:27
     * @acl : true
     * @description :  向指定的直连队列中发送消息
     */
    public void sendMsg2DirectQueue(String exchangeName, String routingKey, Map<String, Object> msg) {
        log.info("****** 发送消息体:{},直连交换机:{}  ******", msg, exchangeName);


        // 生成消息发送
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

扇形队列模式

@Component
@Slf4j
public class RabbitMQFanoutProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:27
     * @acl : true
     * @description :  向指定的扇形交换机中发送消息(广播,辐射式进行多个队列发送)
     */
    public void sendMsg2FanoutQueue(String exchangeName, String routingKey, Map<String, Object> msg) {
        log.info("****** 发送消息体:{},扇形交换机:{}  ******", msg, exchangeName);


        // 生成消息发送
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

主题队列模式

 */
@Component
@Slf4j
public class RabbitMQTopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @author : Lcywings
     * @date : 2021/9/1 14:27
     * @acl : true
     * @description : RabbitMQ主题消息生产者c
     */
    public void sendMsg2TopicQueue(String exchangeName, String routingKey, Map<String, Object> msg) {
        log.info("****** 发送消息体:{},主题交换机:{} ******", msg, exchangeName);


        // 生成消息发送
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

常量类:

/**
 * 直连Direct:队列名称
 */
public static final String DIRECT_QUEUE_NAME = "rabbit-direct-queue-test";

/**
 * 直连Direct:交换机名称
 */
public static final String DIRECT_EXCHANGE_NAME = "rabbit-direct-exchange-test";

/**
 * 直连Direct:路由键-队列和交换机的绑定标识
 */
public static final String DIRECT_ROUTING_NAME = "rabbit-direct-routing-test";

/**
 * 扇形Fanout:队列名称one
 */
public static final String FANOUT_QUEUE_ONE_NAME = "rabbit-fanout-queue-one-test";

/**
 * 扇形Fanout:队列名称two
 */
public static final String FANOUT_QUEUE_TWO_NAME = "rabbit-fanout-queue-two-test";

/**
 * 扇形Fanout:队列名称three
 */
public static final String FANOUT_QUEUE_THREE_NAME = "rabbit-fanout-queue-three-test";

/**
 * 扇形Fanout:交换机名称
 */
public static final String FANOUT_EXCHANGE_NAME = "rabbit-fanout-exchange-test";

/**
 * 主题topic:队列名称one
 */
public static final String TOPIC_QUEUE_ONE_NAME = "rabbit-topic-queue-one-test";

/**
 * 主题topic:队列名称one
 */
public static final String TOPIC_QUEUE_TWO_NAME = "rabbit-topic-queue-two-test";

/**
 * 主题Topic:交换机名称
 */
public static final String TOPIC_EXCHANGE_NAME = "rabbit-topic-fanout-test";

/**
 * 主题Topic:唯一匹配路由键-队列和交换机的绑定标识
 */
public static final String TOPIC_ROUTING_ONLY_NAME = "rabbit-topic-routing-test.only";


/**
 * 主题Topic:批量匹配路由键-队列和交换机的绑定标识
 */
public static final String TOPIC_ROUTING_BATCH_NAME = "rabbit-topic-routing-test.#";

Q.E.D.