4.异步处理和定时
时间处理
long chargeStartTime = System.currentTimeMillis();
long chargeBeginTime = LocalDateTime.now()
.toInstant(ZoneOffset.of("+8")).toEpochMilli();
@RequestParam
需求某个参数注解
@Async
开启异步处理注解,在同一个类中,对其他方法调用时不可以异步处理(不生效)
所以需要在其他地方创建:
异步服务接口↓
package com.lcywings.sbt.service;
/**
* Created on 2021/7/21.
* <p>
* Author : Lcywings
* <p>
* Description :
*/
public interface AsyncChargeService {
void executeSyncCharge(String phoneNo);
}
实现类↓
package com.lcywings.sbt.service.impl;
import com.lcywings.sbt.service.AsyncChargeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestParam;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
/**
* Created on 2021/7/21.
* <p>
* Author : Lcywings
* <p>
* Description :
*/
@Slf4j
@Service
public class AsyncChargeServiceImpl implements AsyncChargeService {
/**
* @author : Lcywings
* @date : 2021/7/21 8:56
* @acl : true
* @description : 同步充值方法
*/
@Async("asyncTaskExecutor") // 开启异步处理注解,在同一个类中,对其他方法调用时不可以异步处理(不生效)
public void executeSyncCharge(@RequestParam String phoneNo) {
log.info("****** 充值用户手机号:{},开始充值 ******", phoneNo);
//充值开始时间
long chargeBeginTime = LocalDateTime.now()
.toInstant(ZoneOffset.of("+8")).toEpochMilli();
//模拟耗时5s
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
//充值结束时间
long chargeFinishTime = LocalDateTime.now()
.toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("****** 充值用户手机号:{},执行充值耗时{}ms ******", phoneNo, chargeFinishTime - chargeBeginTime);
log.info("****** 充值用户手机号:{},结束充值 ******", phoneNo);
}
}
@Async("asyncTaskExecutor") 在其中指定谁可以调用这个接口对应实现类中的方法
@EnableAsync
必须在主程序中使用此注解才能开启异步
自定义线程池(ThreadPoolTaskExecutor)
package com.lcywings.sbt.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* Created on 2021/7/21.
* <p>
* Author : Lcywings
* <p>
* Description : 自定义线程池配置
*/
@Configuration
@Slf4j
public class TaskExecutorConfig {
//从配置文件中,读取自定义配置,是用@Value
//核心线程数
@Value("${async.thread.core-pool-size}")
private Integer corePoolSize;
//最大线程数
@Value("${async.thread.max-pool-size}")
private Integer maxPoolSize;
//队列容量
@Value("${async.thread.queue-capactiy}")
private Integer queueCapactiy;
/**
* @author : Lcywings
* @date : 2021/7/21 9:50
* @acl : true
* @description : 配置自定义线程池,放入容器
* 核心参数关系
* 当请求需要线程池中的线程处理时,首先判断corePoolSize有没有满,没有满,直接创建一个新的线程进行任务处理
* 如果满了,判断队列容量queue-capactiy是否满了,如果没有满,将请求放入缓存队列,等待线程处理
* 如果缓存队列满了,看是否达到了最大线程数max-pool-size,如果没有,创建新的线程进行任务处理,如果达到了最大线程数,
* 直接将请求交给饱和处理策略(是丢弃还是使用调用者等待任务处理)
*/
@Bean
public Executor asyncTaskExecutor() {
log.info("------ asyncTaskExecutor Begin ------");
// 创建一个线程池对象
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心参数:核心线程数,允许系统保留指定大小的线程数,存放在线程池值
executor.setCorePoolSize(corePoolSize);
// 设置核心参数:最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 设置核心参数:队伍容量
executor.setQueueCapacity(queueCapactiy);
// 自定义线程名设置
executor.setThreadNamePrefix("yuyi-async-thread-");
//线程存活时间,默认就是60s
// executor.setKeepAliveSeconds(60);
// 注意:必须调用初始化方法,自定义配置才能生效
executor.initialize();
log.info("------ asyncTaskExecutor End ------");
return executor;
}
}
new一个ThreadPoolTaskExecutor对象,设置以下属性
需要配置属性:
- 核心线程数:setCorePoolSize
- 最大线程数:setMaxPoolSize
- 队伍容量:setQueueCapacity
- 自定义线程名:setThreadNamePrefix
初始化方法:initialize
注意:
- 当一个任务被提交到线程池时,首先查看线程池的 核心线程(corePoolSize)是否都在执行任务,
否
就选择一条线程执行,是
就执行 第二步。 - 查看 核心线程(corePoolSize)是否已满,
不满
就继续创建一条线程,否则执行 第三步。 - 查看线程队列(BlockQueue )是否已满,
不满
就将线程存储到 线程队列 中,否则执行 第四步。 - 查看 最大线程(maximumPoolSize )是否已满,
不满
就创建一条线程执行任务,否则就按照策略处理无法执行的任务。
具体详解:
定时任务
创建新包schedule,创建定时类,本身可以被spring管理,需要加@Component
@EnableScheduling
必须在主程序中使用此注解才能开启定时任务
自定义定制类↓
package com.lcywings.sbt.schedule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
@Slf4j
@Component
public class AsyncchargeCallBackSchedule {
/**
* @author : Lcywings
* @date : 2021/7/21 10:45
* @acl : true
* @description : 用法1:每隔十秒执行一次定时任务
*/
@Scheduled(fixedDelay = 10 * 1000)
public void asyncChargeCallBack() {
//模拟从数据库或者缓存中获取需要回调的订单
List<String> chargeOrder = Arrays.asList("T2021072100001",
"T2021072100003", "T2021072100005", "T2021072100007");
log.info("------ 开始执行定时回调处理值 ------");
//
chargeOrder.forEach(order -> {
log.info("回调订单:{},充值成功", order);
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("------ 结束执行定时回调处理值 ------");
}
/**
* @author : Lcywings
* @date : 2021/7/21 10:45
* @acl : true
* @description : 用法2:每隔十秒执行一次定时任务
*/
// @Scheduled(fixedRate = 10 * 1000)
public void asyncChargeCallBackRate() {
//模拟从数据库或者缓存中获取需要回调的订单
List<String> chargeOrder = Arrays.asList("T2021072100002",
"T2021072100004", "T2021072100006", "T2021072100008");
log.info("------ 开始执行定时回调处理值 ------");
//
chargeOrder.forEach(order -> {
log.info("回调订单:{},充值成功", order);
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("------ 结束执行定时回调处理值 ------");
}
/**
* @author : Lcywings
* @date : 2021/7/21 11:02
* @acl : true
* @description : 用法3:每隔十秒执行一次定时任务
* corn表达式,实现定时任务,灵活性是最强的,可以实现循环定时,
* 也可以实现指定定时(每天某个时间,每周某个时间...更适用于数据统计处理)
*/
// @Scheduled(cron = "*/10 * * * * ?")//每隔10s一次,必须是整10s
// @Scheduled(cron = "10 * * * * ?")//每分钟的第10s执行一次
// @Scheduled(cron = "0 14 11 * * ?")//每天的11点14分执行一次
public void asyncChargeCallBackCorn() {
//模拟从数据库或者缓存中获取需要回调的订单
List<String> chargeOrder = Arrays.asList("T202107210000A",
"T202107210000B", "T202107210000C", "T202107210000D");
log.info("------ 开始执行定时回调处理值 ------");
//
chargeOrder.forEach(order -> {
log.info("回调订单:{},充值成功", order);
try {
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("------ 结束执行定时回调处理值 ------");
}
/**
* @author : Lcywings
* @date : 2021/7/21 10:45
* @acl : true
* @description : 用法1:每隔十秒执行一次定时任务
*/
// @Scheduled(fixedDelay = 10 * 1000)
public void asyncChargeCallBackOut() {
//模拟从数据库或者缓存中获取需要回调的订单
List<String> chargeOrder = Arrays.asList("T2021072100001",
"T2021072100003", "T2021072100005", "T2021072100007");
log.info("------ 开始执行定时回调处理值 ------");
//
chargeOrder.forEach(order -> {
log.info("回调订单:{},充值成功", order);
try {
//当定时执行的耗时,超出定时执行的循环间隔时间
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("------ 结束执行定时回调处理值 ------");
}
}
注解功能区分,优缺点分析:
一共有三个注解:
- fixedDelay
- fixedRate
- cron
启动立刻执行定时任务
fixedDelay
fixedRate
fixedDelay:
fixedDelay: 指定定时任务的间隔时间,单位是毫秒,从前一次定时任务执行结束开始算时间,到下一次定时任务开始,中间的间隔时间,如果前一次定时任务执行时间已经超出了定时时间间隔,仍然会在前一次任务执行结束后,在指定的间隔时间后执行下一次任务
fixedRate:
指定定时任务的间隔时间,单位是毫秒,从前一次定时任务执行开始算时间,到下一次定时任务开始,中间的间隔时间,如果前一次定时任务执行时间已经超出了定时时间间隔,立刻执行下一次的定时任务
cron:
指定的是表达式的字符串,可以设置间隔时间,也可以设置定点时间,灵活性最强,表达式不需要死记,需要用的时候直接在线查询或者生成即同循环定时执行,每次执行的时间点都是固定的(比如每隔10s执行一次,必然是10s,20s,30s.),如果前一次执行耗时已经过了下一次应该执行的定时时间(10s开始执行,耗时12s,超出下一次的20s,自动顺序到再下一次应该执行的定时时间点(30s)执行,不是中间间隔时间
课后作业:
自行查阅资料实现excel文件的上传和解析(学习内容:使用阿里巴巴easyexcel实现)
- 要求:浏览器上传文件,使用异步解析方式(同步返回客户端提示信息:文件上传成功,正在解析...),异步解析成功,立刻发邮件
- 邮件主题:KH89-姓名-异步解析excel
- 邮件内容:excel中文件内容
- excel文件:
- 表头:学号 姓名 年龄 性别 毕业院校 生日 手机号
- 邮箱内容:个人信息
EasyExcel:
Maven依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>2.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
除了easyexcel,实例中还用到了json
Beans
package com.lcywings.sbt.beans;
import lombok.Data;
import java.util.Date;
@Data
public class ExcelData {
/**
* 列表属性...
*/
private Integer id;
}
其实就对应了excel中列表的属性
Controller
/**
* @author : Lcywings
* @date : 2021/7/21 20:53
* @acl : true
* @description : 读exe
*/
@GetMapping("/read")
public void read() {
// 有个很重要的点 ExcelDataListener 不能被spring管理,要每次读取excel都要new,然后里面用到spring可以构造方法传进去
// 写法1:只读取一个sheet
// fileName = FileUtil.getPath() + "demo" + File.separator + "aaa.xls";
String fileName = "E:/asd/aaa.xls";
// 这里 需要指定读用哪个class去读,然后读取第一个sheet 文件流会自动关闭
EasyExcel.read(fileName, ExcelData.class, new ExcelDataListener(demoDAO)).sheet().doRead();
// 写法2:读取多个sheet
fileName = FileUtil.getPath() + "demo" + File.separator + "aaa.xls";
ExcelReader excelReader = null;
try {
excelReader = EasyExcel.read(fileName, ExcelData.class, new ExcelDataListener()).build();
ReadSheet readSheet = EasyExcel.readSheet(0).build();
excelReader.read(readSheet);
} finally {
if (excelReader != null) {
// 这里千万别忘记关闭,读的时候会创建临时文件,到时磁盘会崩的
excelReader.finish();
}
}
}
- 写法1只能读取一个sheet
- 写法2能读取多个sheet
主要负责获取文件全路径及文件名,传给EasyExcel.read()方法即可。
EasyExcel.read(fileName, ExcelData.class, new ExcelDataListener(demoDAO)).sheet().doRead();
- fileName:全路径及文件名。
- ExcelData.class:哪个bean来接收数据。
- new ExcelDataListener(demoDAO)):监听器,很重要,demoDAO用来处理数据的类
listener
ExcelDataListener继承自AnalysisEventListener
package com.lcywings.sbt.listener;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.lcywings.sbt.Dao.DemoDAO;
import com.lcywings.sbt.beans.ExcelData;
import com.lcywings.sbt.service.SendService;
import com.lcywings.sbt.service.impl.SendServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
/**
* Created on 2021/7/21.
* <p>
* Author : Lcywings
* <p>
* Description :
*/
// 有个很重要的点 ExcelDataListener 不能被spring管理,要每次读取excel都要new,然后里面用到spring可以构造方法传进去
@Slf4j
public class ExcelDataListener extends AnalysisEventListener<ExcelData> {
/**
* 每隔5条存储数据库,实际使用中可以3000条,然后清理list ,方便内存回收
*/
private static final int BATCH_COUNT = 5;
List<ExcelData> list = new ArrayList<>();
/**
* 假设这个是一个DAO,当然有业务逻辑这个也可以是一个service。当然如果不用存储这个对象没用。
*/
private DemoDAO demoDAO;
// public ExcelDataListener() {
// //这里是demo,所以随便new一个。实际使用如果到了spring, 请使用下面的有参构造函数
// demoDAO = new DemoDAO();
// }
/**
* 如果使用了spring,请使用这个构造方法。每次创建Listener的时候需要把spring管理的类传进来
*
* @param demoDAO
*/
public ExcelDataListener(DemoDAO demoDAO) {
this.demoDAO = demoDAO;
}
/**
* 这个每一条数据解析都会来调用
*
* @param data one row value. Is is same as {@link AnalysisContext#readRowHolder()}
* @param context
*/
@Override
public void invoke(ExcelData data, AnalysisContext context) {
log.info("解析到一条数据:{}", JSON.toJSONString(data));
list.add(data);
// 达到BATCH_COUNT了,需要去存储一次数据库,防止数据几万条数据在内存,容易BOOM
if (list.size() >= BATCH_COUNT) {
saveData();
// 存储完成清理 list
list.clear();
}
}
/**
* 所有数据解析完成了 都会来调用
*
* @param context
*/
@Override
public void doAfterAllAnalysed(AnalysisContext context) {
// 这里也要保存数据,确保最后遗留的数据也存储到数据库
saveData();
log.info("所有数据解析完成!");
}
/**
* 加上存储数据库
*/
private void saveData() {
log.info("{}条数据,开始存储数据库!", list.size());
demoDAO.save(list);
log.info("存储数据库成功!");
}
}
监听类主要方法
- ExcelDataListener():单参构造方法,用来传数据处理类,用来保存数据
- invoke:获取excel的主要类
- doAfterAllAnalysed:最后结束
- saveData:数据保存方法,调用传进来的数据处理类来处理数据
Dao
package com.lcywings.sbt.Dao;
import com.lcywings.sbt.beans.ExcelData;
import com.lcywings.sbt.service.SendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* Created on 2021/7/21.
* <p>
* Author : Lcywings
* <p>
* Description :假设这个是你的DAO存储。当然还要这个类让spring管理,当然你不用需要存储,也不需要这个类。
*/
@Component
public class DemoDAO {
@Autowired
SendService service;
public void save(List<ExcelData> list) {
// 如果是mybatis,尽量别直接调用多次insert,自己写一个mapper里面新增一个方法batchInsert,所有数据一次性插入
System.out.println("-------------------------------------");
list.forEach(System.out::println);
service.sendMail(list.toString());
System.out.println("-------------------------------------");
}
}
数据处理类,有个一save方法。
正常处理流程应该是一个mapper接口,且其中有一个save抽象方法,然后在mybatis中实现这个接口,把数据存储在数据库中,参数类型就是bean中的ExcelData。
这里没有保存数据,而是直接去发邮件。
Q.E.D.