4.异步处理和定时

时间处理

long chargeStartTime = System.currentTimeMillis();
long chargeBeginTime = LocalDateTime.now()
        .toInstant(ZoneOffset.of("+8")).toEpochMilli();

@RequestParam

需求某个参数注解

@Async

开启异步处理注解,在同一个类中,对其他方法调用时不可以异步处理(不生效)

所以需要在其他地方创建:

异步服务接口↓

package com.lcywings.sbt.service;


/**
 * Created on 2021/7/21.
 * <p>
 * Author : Lcywings
 * <p>
 * Description :
 */
public interface AsyncChargeService {
    void executeSyncCharge(String phoneNo);
}

实现类↓

package com.lcywings.sbt.service.impl;

import com.lcywings.sbt.service.AsyncChargeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestParam;

import java.time.LocalDateTime;
import java.time.ZoneOffset;

/**
 * Created on 2021/7/21.
 * <p>
 * Author : Lcywings
 * <p>
 * Description :
 */
@Slf4j
@Service
public class AsyncChargeServiceImpl implements AsyncChargeService {
    /**
     * @author : Lcywings
     * @date : 2021/7/21 8:56
     * @acl : true
     * @description : 同步充值方法
     */
    @Async("asyncTaskExecutor") // 开启异步处理注解,在同一个类中,对其他方法调用时不可以异步处理(不生效)
    public void executeSyncCharge(@RequestParam String phoneNo) {
        log.info("****** 充值用户手机号:{},开始充值 ******", phoneNo);

        //充值开始时间
        long chargeBeginTime = LocalDateTime.now()
                .toInstant(ZoneOffset.of("+8")).toEpochMilli();

        //模拟耗时5s
        try {
            Thread.sleep(3000);
        } catch (Exception e) {
            e.printStackTrace();
        }

        //充值结束时间
        long chargeFinishTime = LocalDateTime.now()
                .toInstant(ZoneOffset.of("+8")).toEpochMilli();

        log.info("****** 充值用户手机号:{},执行充值耗时{}ms ******", phoneNo, chargeFinishTime - chargeBeginTime);

        log.info("****** 充值用户手机号:{},结束充值 ******", phoneNo);
    }
}

@Async("asyncTaskExecutor") 在其中指定谁可以调用这个接口对应实现类中的方法

@EnableAsync

必须在主程序中使用此注解才能开启异步

自定义线程池(ThreadPoolTaskExecutor)

package com.lcywings.sbt.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * Created on 2021/7/21.
 * <p>
 * Author : Lcywings
 * <p>
 * Description : 自定义线程池配置
 */
@Configuration
@Slf4j
public class TaskExecutorConfig {

    //从配置文件中,读取自定义配置,是用@Value
    //核心线程数
    @Value("${async.thread.core-pool-size}")
    private Integer corePoolSize;

    //最大线程数
    @Value("${async.thread.max-pool-size}")
    private Integer maxPoolSize;

    //队列容量
    @Value("${async.thread.queue-capactiy}")
    private Integer queueCapactiy;

    /**
     * @author : Lcywings
     * @date : 2021/7/21 9:50
     * @acl : true
     * @description : 配置自定义线程池,放入容器
     * 核心参数关系
     * 当请求需要线程池中的线程处理时,首先判断corePoolSize有没有满,没有满,直接创建一个新的线程进行任务处理
     * 如果满了,判断队列容量queue-capactiy是否满了,如果没有满,将请求放入缓存队列,等待线程处理
     * 如果缓存队列满了,看是否达到了最大线程数max-pool-size,如果没有,创建新的线程进行任务处理,如果达到了最大线程数,
     * 直接将请求交给饱和处理策略(是丢弃还是使用调用者等待任务处理)
     */
    @Bean
    public Executor asyncTaskExecutor() {
        log.info("------ asyncTaskExecutor Begin ------");

        // 创建一个线程池对象
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 设置核心参数:核心线程数,允许系统保留指定大小的线程数,存放在线程池值
        executor.setCorePoolSize(corePoolSize);

        // 设置核心参数:最大线程数
        executor.setMaxPoolSize(maxPoolSize);

        // 设置核心参数:队伍容量
        executor.setQueueCapacity(queueCapactiy);

        // 自定义线程名设置
        executor.setThreadNamePrefix("yuyi-async-thread-");

        //线程存活时间,默认就是60s
        // executor.setKeepAliveSeconds(60);

        // 注意:必须调用初始化方法,自定义配置才能生效
        executor.initialize();

        log.info("------ asyncTaskExecutor End ------");
        return executor;

    }

}

new一个ThreadPoolTaskExecutor对象,设置以下属性

需要配置属性:

  • ​ 核心线程数:setCorePoolSize
  • ​ 最大线程数:setMaxPoolSize
  • ​ 队伍容量:setQueueCapacity
  • ​ 自定义线程名:setThreadNamePrefix

初始化方法:initialize

注意:

  1. 当一个任务被提交到线程池时,首先查看线程池的 核心线程(corePoolSize)是否都在执行任务, 就选择一条线程执行, 就执行 第二步
  2. 查看 核心线程(corePoolSize)是否已满,不满 就继续创建一条线程,否则执行 第三步
  3. 查看线程队列(BlockQueue )是否已满,不满 就将线程存储到 线程队列 中,否则执行 第四步
  4. 查看 最大线程(maximumPoolSize )是否已满,不满 就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

具体详解:

https://xiaojin21cen.blog.csdn.net/article/details/87348237?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-12.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-12.control

定时任务

创建新包schedule,创建定时类,本身可以被spring管理,需要加@Component

@EnableScheduling

必须在主程序中使用此注解才能开启定时任务

自定义定制类↓

package com.lcywings.sbt.schedule;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;

@Slf4j
@Component
public class AsyncchargeCallBackSchedule {

    /**
     * @author : Lcywings
     * @date : 2021/7/21 10:45
     * @acl : true
     * @description : 用法1:每隔十秒执行一次定时任务
     */
    @Scheduled(fixedDelay = 10 * 1000)
    public void asyncChargeCallBack() {
        //模拟从数据库或者缓存中获取需要回调的订单
        List<String> chargeOrder = Arrays.asList("T2021072100001",
                "T2021072100003", "T2021072100005", "T2021072100007");

        log.info("------ 开始执行定时回调处理值 ------");

        //
        chargeOrder.forEach(order -> {
            log.info("回调订单:{},充值成功", order);
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        log.info("------ 结束执行定时回调处理值 ------");
    }


    /**
     * @author : Lcywings
     * @date : 2021/7/21 10:45
     * @acl : true
     * @description : 用法2:每隔十秒执行一次定时任务
     */
//    @Scheduled(fixedRate = 10 * 1000)
    public void asyncChargeCallBackRate() {
        //模拟从数据库或者缓存中获取需要回调的订单
        List<String> chargeOrder = Arrays.asList("T2021072100002",
                "T2021072100004", "T2021072100006", "T2021072100008");

        log.info("------ 开始执行定时回调处理值 ------");

        //
        chargeOrder.forEach(order -> {
            log.info("回调订单:{},充值成功", order);
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        log.info("------ 结束执行定时回调处理值 ------");
    }

    /**
     * @author : Lcywings
     * @date : 2021/7/21 11:02
     * @acl : true
     * @description : 用法3:每隔十秒执行一次定时任务
     * corn表达式,实现定时任务,灵活性是最强的,可以实现循环定时,
     * 也可以实现指定定时(每天某个时间,每周某个时间...更适用于数据统计处理)
     */
//    @Scheduled(cron = "*/10 * * * * ?")//每隔10s一次,必须是整10s
//    @Scheduled(cron = "10 * * * * ?")//每分钟的第10s执行一次
//    @Scheduled(cron = "0 14 11 * * ?")//每天的11点14分执行一次
    public void asyncChargeCallBackCorn() {
        //模拟从数据库或者缓存中获取需要回调的订单
        List<String> chargeOrder = Arrays.asList("T202107210000A",
                "T202107210000B", "T202107210000C", "T202107210000D");

        log.info("------ 开始执行定时回调处理值 ------");

        //
        chargeOrder.forEach(order -> {
            log.info("回调订单:{},充值成功", order);
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        log.info("------ 结束执行定时回调处理值 ------");
    }

    /**
     * @author : Lcywings
     * @date : 2021/7/21 10:45
     * @acl : true
     * @description : 用法1:每隔十秒执行一次定时任务
     */
//    @Scheduled(fixedDelay = 10 * 1000)
    public void asyncChargeCallBackOut() {
        //模拟从数据库或者缓存中获取需要回调的订单
        List<String> chargeOrder = Arrays.asList("T2021072100001",
                "T2021072100003", "T2021072100005", "T2021072100007");

        log.info("------ 开始执行定时回调处理值 ------");

        //
        chargeOrder.forEach(order -> {
            log.info("回调订单:{},充值成功", order);
            try {
                //当定时执行的耗时,超出定时执行的循环间隔时间
                Thread.sleep(3000);

            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        log.info("------ 结束执行定时回调处理值 ------");
    }

}

注解功能区分,优缺点分析:

一共有三个注解:

  • ​ fixedDelay
  • ​ fixedRate
  • ​ cron
启动立刻执行定时任务

fixedDelay

fixedRate

fixedDelay:

fixedDelay: 指定定时任务的间隔时间,单位是毫秒,从前一次定时任务执行结束开始算时间,到下一次定时任务开始,中间的间隔时间,如果前一次定时任务执行时间已经超出了定时时间间隔,仍然会在前一次任务执行结束后,在指定的间隔时间后执行下一次任务

fixedRate:

指定定时任务的间隔时间,单位是毫秒,从前一次定时任务执行开始算时间,到下一次定时任务开始,中间的间隔时间,如果前一次定时任务执行时间已经超出了定时时间间隔,立刻执行下一次的定时任务

cron:

指定的是表达式的字符串,可以设置间隔时间,也可以设置定点时间,灵活性最强,表达式不需要死记,需要用的时候直接在线查询或者生成即同循环定时执行,每次执行的时间点都是固定的(比如每隔10s执行一次,必然是10s,20s,30s.),如果前一次执行耗时已经过了下一次应该执行的定时时间(10s开始执行,耗时12s,超出下一次的20s,自动顺序到再下一次应该执行的定时时间点(30s)执行,不是中间间隔时间


课后作业:

自行查阅资料实现excel文件的上传和解析(学习内容:使用阿里巴巴easyexcel实现)

  • 要求:浏览器上传文件,使用异步解析方式(同步返回客户端提示信息:文件上传成功,正在解析...),异步解析成功,立刻发邮件
  • 邮件主题:KH89-姓名-异步解析excel
  • 邮件内容:excel中文件内容
  • excel文件:
  • 表头:学号 姓名 年龄 性别 毕业院校 生日 手机号
  • 邮箱内容:个人信息

EasyExcel:

Maven依赖

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>easyexcel</artifactId>
    <version>2.1.6</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.58</version>
</dependency>

除了easyexcel,实例中还用到了json

Beans

package com.lcywings.sbt.beans;
import lombok.Data;
import java.util.Date;

@Data
public class ExcelData {
    /**
     * 列表属性...
     */
    private Integer id;
}

其实就对应了excel中列表的属性

Controller

 /**
     * @author : Lcywings
     * @date : 2021/7/21 20:53
     * @acl : true
     * @description : 读exe
     */
    @GetMapping("/read")
    public void read() {
        // 有个很重要的点 ExcelDataListener 不能被spring管理,要每次读取excel都要new,然后里面用到spring可以构造方法传进去
        // 写法1:只读取一个sheet
        // fileName = FileUtil.getPath() + "demo" + File.separator + "aaa.xls";
        String fileName = "E:/asd/aaa.xls";
        // 这里 需要指定读用哪个class去读,然后读取第一个sheet 文件流会自动关闭
        EasyExcel.read(fileName, ExcelData.class, new ExcelDataListener(demoDAO)).sheet().doRead();
        // 写法2:读取多个sheet
        fileName = FileUtil.getPath() + "demo" + File.separator + "aaa.xls";
        ExcelReader excelReader = null;
        try {
            excelReader = EasyExcel.read(fileName, ExcelData.class, new ExcelDataListener()).build();
            ReadSheet readSheet = EasyExcel.readSheet(0).build();
            excelReader.read(readSheet);
        } finally {
            if (excelReader != null) {
                // 这里千万别忘记关闭,读的时候会创建临时文件,到时磁盘会崩的
                excelReader.finish();
            }
        }
    }
  1. 写法1只能读取一个sheet
  2. 写法2能读取多个sheet

主要负责获取文件全路径及文件名,传给EasyExcel.read()方法即可。

EasyExcel.read(fileName, ExcelData.class, new ExcelDataListener(demoDAO)).sheet().doRead();

  • fileName:全路径及文件名。
  • ExcelData.class:哪个bean来接收数据。
  • new ExcelDataListener(demoDAO)):监听器,很重要,demoDAO用来处理数据的类

listener

ExcelDataListener继承自AnalysisEventListener泛型是接收bean的数据类型

package com.lcywings.sbt.listener;

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.lcywings.sbt.Dao.DemoDAO;
import com.lcywings.sbt.beans.ExcelData;
import com.lcywings.sbt.service.SendService;
import com.lcywings.sbt.service.impl.SendServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

/**
 * Created on 2021/7/21.
 * <p>
 * Author : Lcywings
 * <p>
 * Description :
 */
// 有个很重要的点 ExcelDataListener 不能被spring管理,要每次读取excel都要new,然后里面用到spring可以构造方法传进去
@Slf4j
public class ExcelDataListener extends AnalysisEventListener<ExcelData> {
    /**
     * 每隔5条存储数据库,实际使用中可以3000条,然后清理list ,方便内存回收
     */
    private static final int BATCH_COUNT = 5;
    List<ExcelData> list = new ArrayList<>();
    /**
     * 假设这个是一个DAO,当然有业务逻辑这个也可以是一个service。当然如果不用存储这个对象没用。
     */
    private DemoDAO demoDAO;


//    public ExcelDataListener() {
//        //这里是demo,所以随便new一个。实际使用如果到了spring, 请使用下面的有参构造函数
//        demoDAO = new DemoDAO();
//    }

    /**
     * 如果使用了spring,请使用这个构造方法。每次创建Listener的时候需要把spring管理的类传进来
     *
     * @param demoDAO
     */
    public ExcelDataListener(DemoDAO demoDAO) {
        this.demoDAO = demoDAO;
    }

    /**
     * 这个每一条数据解析都会来调用
     *
     * @param data    one row value. Is is same as {@link AnalysisContext#readRowHolder()}
     * @param context
     */
    @Override
    public void invoke(ExcelData data, AnalysisContext context) {
        log.info("解析到一条数据:{}", JSON.toJSONString(data));
        list.add(data);
        // 达到BATCH_COUNT了,需要去存储一次数据库,防止数据几万条数据在内存,容易BOOM
        if (list.size() >= BATCH_COUNT) {
            saveData();
            // 存储完成清理 list
            list.clear();
        }
    }

    /**
     * 所有数据解析完成了 都会来调用
     *
     * @param context
     */
    @Override
    public void doAfterAllAnalysed(AnalysisContext context) {
        // 这里也要保存数据,确保最后遗留的数据也存储到数据库
        saveData();
        log.info("所有数据解析完成!");
    }

    /**
     * 加上存储数据库
     */
    private void saveData() {
        log.info("{}条数据,开始存储数据库!", list.size());
        demoDAO.save(list);
        log.info("存储数据库成功!");
    }
}

监听类主要方法

  • ExcelDataListener():单参构造方法,用来传数据处理类,用来保存数据
  • invoke:获取excel的主要类
  • doAfterAllAnalysed:最后结束
  • saveData:数据保存方法,调用传进来的数据处理类来处理数据

Dao

package com.lcywings.sbt.Dao;

import com.lcywings.sbt.beans.ExcelData;
import com.lcywings.sbt.service.SendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Created on 2021/7/21.
 * <p>
 * Author : Lcywings
 * <p>
 * Description :假设这个是你的DAO存储。当然还要这个类让spring管理,当然你不用需要存储,也不需要这个类。
 */
@Component
public class DemoDAO {

    @Autowired
    SendService service;

    public void save(List<ExcelData> list) {
        // 如果是mybatis,尽量别直接调用多次insert,自己写一个mapper里面新增一个方法batchInsert,所有数据一次性插入
        System.out.println("-------------------------------------");
        list.forEach(System.out::println);
        service.sendMail(list.toString());
        System.out.println("-------------------------------------");
    }
}

数据处理类,有个一save方法。

正常处理流程应该是一个mapper接口,且其中有一个save抽象方法,然后在mybatis中实现这个接口,把数据存储在数据库中,参数类型就是bean中的ExcelData。

这里没有保存数据,而是直接去发邮件。

Q.E.D.