6.消息队列ActiveMq和RabbitMq
概念
- 异步处理
- 流量削峰
- 应用解耦
- 消息通讯
消息队列
-
点对点
- 每个消息只有一个消费者
- 发送者和消费者没有时间上的约束
- 接收方在接受完消息后,需要向消息队列应答成功
-
发布订阅
- 一个消息可以有多个订阅者
- 发布者与订阅者具有时间约束,针对某个Topic的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,且必须保持运行的状态
- 为了缓和时间约束,JMS允许订阅者可创建一个持久化的订阅。这样,即使订阅者没有激活,它也能接收到发布者的消息。
MQ消息队列的安装
zip解压就OK了
后台服务,端口8161
ActiveMQ的使用
需要提供者和消费者两个模块,来模拟使用,和redis一样,后端要启动队列服务!
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置:
# 应用名称
spring.application.name=springboot-12-activemq
# 应用服务 WEB 访问端口
server.port=8089
# activemq配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
# 信任所有包,保证消息消息对象在传输过程中,能争取序列化成功
spring.activemq.packages.trust-all=true
# 默认情况下,ActiveMQ使用的模式是队列Queue,如果要是用主题队列模式Topic模式,必须改默认配置
# 如果单独开主题模式,会导致系统只能使用主题,不可以使用点对点模式
#spring.jms.pub-sub-domain=true
队列模式
提供者模块:
-
一般是由控制层调用方法,把数据发送进消息队列
-
类需要加注@Component注解
-
自动装配JmsMessagingTemplate对象,由他进行发送
-
需要Destination容器对象,new一个ActiveMQQueue(和订阅模式的区别)对象,并把唯一识别的队列名作为参数传入
-
发送方法convertAndSend(Destination,msg)
回调方法:
把接收那套拿过来用
package com.lcywings.sbt.provider;
import com.lcywings.sbt.constant.ActiveMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Destination;
/**
* Created on 2021/8/9.
* <p>
* Author : Lcywings
* <p>
* Description : 消息队列的生产者
*/
@Component
@Slf4j
public class ActiveMQQueueProducer {
@Autowired(required = false)
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* @author : Lcywings
* @date : 2021/8/9 9:43
* @acl : true
* @description : 想指定消息队列中发送信息
*/
public void sendMsg2Queue(String queueName, String msg) {
log.info("****** 2 向队列:{},发送消息:{} ******", queueName, msg);
// 创建一个目标消息队列(消息容器,只有有容器,才可以存放消息)
Destination destination = new ActiveMQQueue(queueName);
// 向目标队列发送消息
jmsMessagingTemplate.convertAndSend(destination, msg);
}
/**
* @author : Lcywings
* @date : 2021/8/9 10:56
* @acl : true
* @description : 接受消费者消费消息成功与否的响应结果(防止消息处理失败,重发,简单的消息确认)
*/
@JmsListener(destination = ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_RESP_TEST)
public void receiveMsgFromConsumerRespQueue(String rspMsg) {
log.info("====== 6 从消费者处理响应结果队列中,获取响应信息:{} ======", rspMsg);
// TODO 针对不同的响应结果,做不同的业务处理,是重发还是确认等
}
}
消费者模块:
-
使用@JmsListener(destination = , containerFactory =)注解实时监听消息队列是否有消息
- destinatio填唯一指定的消息队列名
- containerFactory指定消息队列工厂,辨别使用哪种模式
-
类需要加注@Component注解
-
方法形参中需要带有对应参数
回调方法:
- @SendTo(),参数填写唯一指定的消息队列名
package com.lcywings.sbt.consumer;
import com.lcywings.sbt.constant.ActiveMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/**
* Created on 2021/8/9.
* <p>
* Author : Lcywings
* <p>
* Description :
*/
@Component
@Slf4j
public class ActiveMQQueueConsumer {
/**
* @author : Lcywings
* @date : 2021/8/9 10:08
* @acl : true
* @description : 从指定的消息队列中,消费信息,进行业务处理
*/
@JmsListener(destination = ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, containerFactory = ActiveMQConstant.POINT_TO_POINT_LISTENER_FACTORY_NAME)
@SendTo(ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_RESP_TEST)
public void consumerFromQueue(String msg) {
log.info("###### 3 从消费队列:{},消费消息:{} ######", ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg);
// TODO 根据消费的信息,进行业务处理
}
}
控制层调用
@Autowired
private ActiveMQQueueProducer activeMQQueueProducer;
/**
* @author : Lcywings
* @date : 2021/8/9 9:37
* @acl : true
* @description : 使用Queue模式(点对点),发送消息和消费消息
*/
@GetMapping("/send2Queue")
public String testActiveMQQueue(@RequestParam String msg) {
log.info("------ 1 向消息队列{},发送5条消息:{} 开始 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg);
Stream.of(1, 3, 5, 7, 9).forEach(index -> activeMQQueueProducer.sendMsg2Queue(ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg + "-" + index));
log.info("------ 3 向消息队列{},发送5条消息:{} 结束 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_NAME_TEST, msg);
return "Send Message To ACtiveMq Queue Success";
}
订阅模式(需要开启配置文件中的配置)
提供者模块(和队列模式基本一致):
- 一般是由控制层调用方法,把数据发送进消息队列
- 类需要加注@Component注解
- 自动装配JmsMessagingTemplate对象,由他进行发送
- 需要Destination容器对象,new一个ActiveMQTopic(和队列模式的区别)对象,并把唯一识别的队列名作为参数传入
- 发送方法convertAndSend(Destination,msg)
package com.lcywings.sbt.provider;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Destination;
/**
* Created on 2021/8/9.
* <p>
* Author : Lcywings
* <p>
* Description : Topic主题消息的发布者
*/
@Component
@Slf4j
public class ActiveMQTopicPublisher {
@Autowired(required = false)
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* @author : Lcywings
* @date : 2021/8/9 11:12
* @acl : true
* @description : 向主题队列中,发布主题消息
*/
public void sendMsg2Topic(String topicName, String msg) {
log.info("****** 2 向主题队列:{},发布一条主题信息:{} ******", topicName, msg);
// 创建一个目标主题UI列
Destination destination = new ActiveMQTopic(topicName);
// 向目标主题队列发
jmsMessagingTemplate.convertAndSend(destination, msg);
}
}
消费者模块(和队列模式一致,区别是可以创建多个):
- 使用@JmsListener(destination = , containerFactory =)注解实时监听消息队列是否有消息
- destinatio填唯一指定的消息队列名
- containerFactory指定消息队列工厂,辨别使用哪种模式
- 类需要加注@Component注解
- 方法形参中需要带有对应参数
package com.lcywings.sbt.consumer;
import com.lcywings.sbt.constant.ActiveMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* Created on 2021/8/9.
* <p>
* Author : Lcywings
* <p>
* Description : 从指定的Topic队列中,订阅消息
*/
@Component
@Slf4j
public class ActiveMQTopicSubscriberOne {
/**
* @author : Lcywings
* @date : 2021/8/9 11:26
* @acl : true
* @description :从指定的主题消息队列中,订阅消息,进行业务处理
*/
@JmsListener(destination = ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST)
public void subscribeMsgFromTopic(String msg) {
log.info("###### 3-1 从主题消息队列:{},订阅消息:{} ######", ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg);
// TODO 根据消费的消息,进行业务处理
}
}
控制层:
@Autowired
private ActiveMQTopicPublisher activeMQTopicPublisher;
/**
* @author : Lcywings
* @date : 2021/8/9 9:37
* @acl : true
* @description : 使用Topic模式(发布订阅),发送消息和订阅消息
*/
@GetMapping("/send2Topic")
public String testActiveMQTopic(@RequestParam String msg) {
log.info("------ 1 向消息队列{},发送5条消息:{} 开始 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg);
Stream.of(0, 2, 4, 6, 8).forEach(index -> activeMQTopicPublisher.sendMsg2Topic(ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg + "-" + index));
log.info("------ 3 向消息队列{},发送5条消息:{} 结束 ------", ActiveMQConstant.ACTIVEMQ_QUEUE_TOPIC_TEST, msg);
return "Send Message To ActiveMq Topic Success";
}
兼容模式
需要配置文件:ActiveMQConfig.java,分别单独配置队列模式和订阅模式
- 配置文件需要有@Configuration注解
- 方法要有@Bean注解
- 配置基本相同,唯一改变的是setPubSubDomain方法中设置T/F的区别
- 关闭配置文件中指定为订阅模式
- 在@JmsListener注解中加入containerFactory属性指定配置好的bean方法名
package com.lcywings.sbt.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
* Created on 2021/8/9.
* <p>
* Author : Lcywings
* <p>
* Description : ActiveMQ配置类,增加对点对点和主题模式的支持
*/
@Configuration
public class ActiveMQConfig {
/**
* @author : Lcywings
* @date : 2021/8/9 11:52
* @acl : true
* @description : 模式1-点对点队列
*/
@Bean
public JmsListenerContainerFactory point2pointJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory simpleJmsListenerContainerFactory = new SimpleJmsListenerContainerFactory();
simpleJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
simpleJmsListenerContainerFactory.setPubSubDomain(false);
return simpleJmsListenerContainerFactory;
}
/**
* @author : Lcywings
* @date : 2021/8/9 11:52
* @acl : true
* @description : 模式2-点对点队列
*/
@Bean
public JmsListenerContainerFactory pub2subJmsJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory simpleJmsListenerContainerFactory = new SimpleJmsListenerContainerFactory();
simpleJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
simpleJmsListenerContainerFactory.setPubSubDomain(true);
return simpleJmsListenerContainerFactory;
}
}
课后作业:
课后作业:
已有文件数据
每列值的含义
- 编号,姓名,密码(md5加密入库),昵称,邮箱,手机号
- U10001,admin,123,admin,1231@kgc.com,13501020301
- U10003,root,123,root,1232@kgc.com,13501020302
- U10005,tom,123,tom,1233@kgc.com,13501020303
- U10007,jack,123,jack,1234@kgc.com,13501020304
- U10009,mark,123,mark,1235@kgc.com,13501020305
- U10002,marry,123,marry,1236@kgc.com,13501020306
- U10004,lilei,123,lilei,1237@kgc.com,13501020307
- U10006,zhangsan,123,zhangsan,1238@kgc.com,13501020308
- U10008,hanmeimei,123,hanmeimei,1239@kgc.com,13501020309
- U10010,lisi,123,lisi,1230@kgc.com,13501020310
要求:
1、解析文件内容,使用消息队列,异步将将数据进行入库,页面提示导入操作成功,入库成功后,通过消息队列,通知短信中心发送短信
2、必须是两个模块,通过消息队列,从解析和入库模块,发送到短信中心模块,不需要真实发送,只需要日志体现
Q.E.D.