6.消息队列ActiveMq和RabbitMq


概念

ActiveMQ实现消息队列_sunjian1122的博客

  1. 异步处理
  2. 流量削峰
  3. 应用解耦
  4. 消息通讯

消息队列

  • 点对点

    1. 每个消息只有一个消费者
    2. 发送者和消费者没有时间上的约束
    3. 接收方在接受完消息后,需要向消息队列应答成功
  • 发布订阅

    1. 一个消息可以有多个订阅者
    2. 发布者与订阅者具有时间约束,针对某个Topic的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,且必须保持运行的状态
    3. 为了缓和时间约束,JMS允许订阅者可创建一个持久化的订阅。这样,即使订阅者没有激活,它也能接收到发布者的消息。

MQ消息队列的安装

zip解压就OK了

后台服务,端口8161


ActiveMQ的使用

需要提供者和消费者两个模块,来模拟使用,和redis一样,后端要启动队列服务!

依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

配置:

# 应用名称
spring.application.name=springboot-12-activemq

# 应用服务 WEB 访问端口
server.port=8089

# activemq配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
# 信任所有包,保证消息消息对象在传输过程中,能争取序列化成功
spring.activemq.packages.trust-all=true
# 默认情况下,ActiveMQ使用的模式是队列Queue,如果要是用主题队列模式Topic模式,必须改默认配置
# 如果单独开主题模式,会导致系统只能使用主题,不可以使用点对点模式
#spring.jms.pub-sub-domain=true

队列模式

提供者模块:

  1. 一般是由控制层调用方法,把数据发送进消息队列

  2. 类需要加注@Component注解

  3. 自动装配JmsMessagingTemplate对象,由他进行发送

  4. 需要Destination容器对象,new一个ActiveMQQueue(和订阅模式的区别)对象,并把唯一识别的队列名作为参数传入

  5. 发送方法convertAndSend(Destination,msg)

    回调方法:

    ​ 把接收那套拿过来用

package com.lcywings.sbt.provider;

import com.lcywings.sbt.constant.ActiveMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Destination;

/**
 * Created on 2021/8/9.
 * <p>
 * Author : Lcywings
 * <p>
 * Description : 消息队列的生产者
 */
@Component
@Slf4j
public class ActiveMQQueueProducer {

    @Autowired(required = false)
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * @author : Lcywings
     * @date : 2021/8/9 9:43
     * @acl : true
     * @description : 想指定消息队列中发送信息
     */
    public void sendMsg2Queue(String queueName, String msg) {
        log.info("****** 2 向队列:{},发送消息:{} ******", queueName, msg);

        // 创建一个目标消息队列(消息容器,只有有容器,才可以存放消息)
        Destination destination = new ActiveMQQueue(queueName);

        // 向目标队列发送消息
        jmsMessagingTemplate.convertAndSend(destination, msg);

    }

    /**
     * @author : Lcywings
     * @date : 2021/8/9 10:56
     * @acl : true
     * @description : 接受消费者消费消息成功与否的响应结果(防止消息处理失败,重发,简单的消息确认)
     */
    @JmsListener(destination = ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_RESP_TEST)
    public void receiveMsgFromConsumerRespQueue(String rspMsg) {
        log.info("====== 6 从消费者处理响应结果队列中,获取响应信息:{} ======", rspMsg);

        // TODO 针对不同的响应结果,做不同的业务处理,是重发还是确认等
    }
}

消费者模块:

  1. 使用@JmsListener(destination = , containerFactory =)注解实时监听消息队列是否有消息

    • destinatio填唯一指定的消息队列名
    • containerFactory指定消息队列工厂,辨别使用哪种模式
  2. 类需要加注@Component注解

  3. 方法形参中需要带有对应参数

    回调方法:

    1. @SendTo(),参数填写唯一指定的消息队列名
package com.lcywings.sbt.consumer;

import com.lcywings.sbt.constant.ActiveMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

/**
 * Created on 2021/8/9.
 * <p>
 * Author : Lcywings
 * <p>
 * Description :
 */
@Component
@Slf4j
public class ActiveMQQueueConsumer {

    /**
     * @author : Lcywings
     * @date : 2021/8/9 10:08
     * @acl : true
     * @description : 从指定的消息队列中,消费信息,进行业务处理
     */
    @JmsListener(destination = ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, containerFactory = ActiveMQConstant.POINT_TO_POINT_LISTENER_FACTORY_NAME)
    @SendTo(ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_RESP_TEST)
    public void consumerFromQueue(String msg) {
        log.info("###### 3 从消费队列:{},消费消息:{} ######", ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg);

        // TODO 根据消费的信息,进行业务处理

    }
}

控制层调用

@Autowired
private ActiveMQQueueProducer activeMQQueueProducer;

/**
 * @author : Lcywings
 * @date : 2021/8/9 9:37
 * @acl : true
 * @description : 使用Queue模式(点对点),发送消息和消费消息
 */
@GetMapping("/send2Queue")
public String testActiveMQQueue(@RequestParam String msg) {
    log.info("------ 1 向消息队列{},发送5条消息:{} 开始 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg);

    Stream.of(1, 3, 5, 7, 9).forEach(index -> activeMQQueueProducer.sendMsg2Queue(ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg + "-" + index));

    log.info("------ 3 向消息队列{},发送5条消息:{} 结束 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg);

    return "Send Message To ACtiveMq Queue Success";
}

订阅模式(需要开启配置文件中的配置)

提供者模块(和队列模式基本一致):

  1. 一般是由控制层调用方法,把数据发送进消息队列
  2. 类需要加注@Component注解
  3. 自动装配JmsMessagingTemplate对象,由他进行发送
  4. 需要Destination容器对象,new一个ActiveMQTopic(和队列模式的区别)对象,并把唯一识别的队列名作为参数传入
  5. 发送方法convertAndSend(Destination,msg)
package com.lcywings.sbt.provider;

import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Destination;

/**
 * Created on 2021/8/9.
 * <p>
 * Author : Lcywings
 * <p>
 * Description : Topic主题消息的发布者
 */
@Component
@Slf4j
public class ActiveMQTopicPublisher {

    @Autowired(required = false)
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * @author : Lcywings
     * @date : 2021/8/9 11:12
     * @acl : true
     * @description :  向主题队列中,发布主题消息
     */
    public void sendMsg2Topic(String topicName, String msg) {
        log.info("****** 2 向主题队列:{},发布一条主题信息:{} ******", topicName, msg);

        // 创建一个目标主题UI列
        Destination destination = new ActiveMQTopic(topicName);

        // 向目标主题队列发
        jmsMessagingTemplate.convertAndSend(destination, msg);
    }


}

消费者模块(和队列模式一致,区别是可以创建多个):

  1. 使用@JmsListener(destination = , containerFactory =)注解实时监听消息队列是否有消息
    • destinatio填唯一指定的消息队列名
    • containerFactory指定消息队列工厂,辨别使用哪种模式
  2. 类需要加注@Component注解
  3. 方法形参中需要带有对应参数
package com.lcywings.sbt.consumer;

import com.lcywings.sbt.constant.ActiveMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * Created on 2021/8/9.
 * <p>
 * Author : Lcywings
 * <p>
 * Description : 从指定的Topic队列中,订阅消息
 */
@Component
@Slf4j
public class ActiveMQTopicSubscriberOne {

    /**
     * @author : Lcywings
     * @date : 2021/8/9 11:26
     * @acl : true
     * @description :从指定的主题消息队列中,订阅消息,进行业务处理
     */
    @JmsListener(destination = ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST)
    public void subscribeMsgFromTopic(String msg) {
        log.info("###### 3-1 从主题消息队列:{},订阅消息:{} ######", ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg);

        // TODO 根据消费的消息,进行业务处理

    }
}

控制层:

@Autowired
private ActiveMQTopicPublisher activeMQTopicPublisher;

/**
 * @author : Lcywings
 * @date : 2021/8/9 9:37
 * @acl : true
 * @description : 使用Topic模式(发布订阅),发送消息和订阅消息
 */
@GetMapping("/send2Topic")
public String testActiveMQTopic(@RequestParam String msg) {
    log.info("------ 1 向消息队列{},发送5条消息:{} 开始 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg);

    Stream.of(0, 2, 4, 6, 8).forEach(index -> activeMQTopicPublisher.sendMsg2Topic(ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg + "-" + index));

    log.info("------ 3 向消息队列{},发送5条消息:{} 结束 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg);

    return "Send Message To ActiveMq Topic Success";
}

兼容模式

需要配置文件:ActiveMQConfig.java,分别单独配置队列模式和订阅模式

  1. 配置文件需要有@Configuration注解
  2. 方法要有@Bean注解
  3. 配置基本相同,唯一改变的是setPubSubDomain方法中设置T/F的区别
  4. 关闭配置文件中指定为订阅模式
  5. 在@JmsListener注解中加入containerFactory属性指定配置好的bean方法名
package com.lcywings.sbt.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

/**
 * Created on 2021/8/9.
 * <p>
 * Author : Lcywings
 * <p>
 * Description : ActiveMQ配置类,增加对点对点和主题模式的支持
 */
@Configuration
public class ActiveMQConfig {

    /**
     * @author : Lcywings
     * @date : 2021/8/9 11:52
     * @acl : true
     * @description : 模式1-点对点队列
     */
    @Bean
    public JmsListenerContainerFactory point2pointJmsListenerContainerFactory(ConnectionFactory connectionFactory) {

        SimpleJmsListenerContainerFactory simpleJmsListenerContainerFactory = new SimpleJmsListenerContainerFactory();
        simpleJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
        simpleJmsListenerContainerFactory.setPubSubDomain(false);
        return simpleJmsListenerContainerFactory;
    }

    /**
     * @author : Lcywings
     * @date : 2021/8/9 11:52
     * @acl : true
     * @description : 模式2-点对点队列
     */
    @Bean
    public JmsListenerContainerFactory pub2subJmsJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleJmsListenerContainerFactory simpleJmsListenerContainerFactory = new SimpleJmsListenerContainerFactory();
        simpleJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
        simpleJmsListenerContainerFactory.setPubSubDomain(true);
        return simpleJmsListenerContainerFactory;
    }

}

课后作业:

课后作业:
已有文件数据
每列值的含义

要求:
1、解析文件内容,使用消息队列,异步将将数据进行入库,页面提示导入操作成功,入库成功后,通过消息队列,通知短信中心发送短信
2、必须是两个模块,通过消息队列,从解析和入库模块,发送到短信中心模块,不需要真实发送,只需要日志体现

Q.E.D.