2026-03-16提交:异步导入,新增任务类

This commit is contained in:
junzhangfm 2026-03-16 17:07:55 +08:00
parent 14865b4663
commit e30929f6f7
15 changed files with 366 additions and 78 deletions

View File

@ -72,6 +72,13 @@
<version>3.3.2</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -0,0 +1,28 @@
package com.southern.power.grid.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* 异步配置
*
* @author: junzhangfm
* @date: 2026/3/16
**/
@Configuration
public class AsyncConfig {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-import-");
executor.initialize();
return executor;
}
}

View File

@ -1,17 +1,16 @@
package com.southern.power.grid.controller;
import com.alibaba.excel.EasyExcel;
import com.southern.power.grid.common.Result;
import com.southern.power.grid.entity.*;
import com.southern.power.grid.listener.DataExcelListener;
import com.southern.power.grid.service.IDnerDailyPowerOutageEventService;
import com.southern.power.grid.service.IDnerHourlyPowerOutageEventService;
import com.southern.power.grid.service.IDnerSiteAreaConfigurationService;
import com.southern.power.grid.service.IImportTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.InputStream;
import java.util.List;
/**
@ -25,7 +24,7 @@ import java.util.List;
@Slf4j
public class DnerController {
@Autowired
private DataExcelListener dataExcelListener;
private IImportTaskService importTaskService;
@Autowired
private IDnerHourlyPowerOutageEventService dnerHourlyPowerOutageEventService;
@ -74,17 +73,21 @@ public class DnerController {
}
/**
* 通过easyexcel流式读取导入数据
* 通过easyexcel流式读取导入数据 -- 异步导入
*
* @param inputStream 文件流
* @param file 文件
* @return 返回结果
*/
@PostMapping("/excel/import")
public Result<Boolean> importExcel(@RequestParam("file") InputStream inputStream) {
// 采用easyExcel流式读取一行一行读不加载整个excel文件防止OOM
EasyExcel.read(inputStream, DataExcelEntity.class, dataExcelListener)
.sheet()
.doRead();
return Result.success(Boolean.TRUE);
public Result<String> importExcel(@RequestParam("file") MultipartFile file) {
String taskNo = importTaskService.importExcel(file);
return Result.success(taskNo);
}
// 2. 查询导入进度
@GetMapping("/progress/{taskNo}")
public Result<ImportTask> getProgress(@PathVariable String taskNo) {
ImportTask task = importTaskService.getProgress(taskNo);
return Result.success(task);
}
}

View File

@ -1,18 +0,0 @@
package com.southern.power.grid.dao;
import com.southern.power.grid.entity.DataExcelEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* excel数据-mapper类
*
* @author: junzhangfm
* @date: 2026/3/13
**/
@Mapper
public interface DataExcelMapper {
void batchInsert(@Param("list") List<DataExcelEntity> list);
}

View File

@ -0,0 +1,16 @@
package com.southern.power.grid.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.southern.power.grid.entity.ImportTask;
import org.apache.ibatis.annotations.Mapper;
/**
* 导入任务Mapper类
*
* @author: junzhangfm
* @date: 2026/3/16
**/
@Mapper
public interface ImportTaskMapper extends BaseMapper<ImportTask> {
}

View File

@ -17,6 +17,11 @@ public class DailyPowerOutageEventVO {
*/
private List<String> fullDates;
/**
* 日累计停电影响数
*/
private List<Integer> userCounts;
/**
* K线数组每一项包含起始值结束值最低最高
*/
@ -55,5 +60,8 @@ public class DailyPowerOutageEventVO {
*/
private List<Double> minTemp;
/**
* 风速
*/
private List<Double> windSpeed;
}

View File

@ -0,0 +1,34 @@
package com.southern.power.grid.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
/**
* 导入任务 -- 实体类
*
* @author: junzhangfm
* @date: 2026/3/16
**/
@Data
@TableName("import_task")
public class ImportTask {
private Long id;
private String taskNo;
private Integer total;
private Integer successCount;
private Integer failCount;
private String status;
private String failMsg;
private Date createTime;
private Date updateTime;
}

View File

@ -0,0 +1,36 @@
package com.southern.power.grid.enums;
/**
* 导入任务状态枚举
*
* @author: junzhangfm
* @date: 2026/3/16
**/
public enum ImportTaskStatusEnum {
// 定义 5 个状态编码 + 描述
WAITING("WAITING", "等待执行"),
PROCESSING("PROCESSING", "正在导入"),
PART_SUCCESS("PART_SUCCESS", "部分成功"),
SUCCESS("SUCCESS", "全部成功"),
FAILED("FAILED", "导入失败");
// 状态值存入数据库的值
private final String code;
// 状态描述
private final String desc;
ImportTaskStatusEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
// 获取状态值你要的就是这个
public String getCode() {
return code;
}
// 获取描述
public String getDesc() {
return desc;
}
}

View File

@ -2,18 +2,20 @@ package com.southern.power.grid.listener;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.southern.power.grid.dao.DataExcelMapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.southern.power.grid.dao.ImportTaskMapper;
import com.southern.power.grid.entity.DataExcelEntity;
import com.southern.power.grid.entity.ImportTask;
import com.southern.power.grid.enums.ImportTaskStatusEnum;
import com.southern.power.grid.service.impl.HourlyOutageExcelProcessService;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* 数据导入逻辑 -- easyexcel事件监听器
@ -21,6 +23,7 @@ import java.util.List;
* @author: junzhangfm
* @date: 2026/3/13
**/
@Slf4j
@Component
public class DataExcelListener extends AnalysisEventListener<DataExcelEntity> {
@ -28,33 +31,32 @@ public class DataExcelListener extends AnalysisEventListener<DataExcelEntity> {
private static final int BATCH_SIZE = 500;
// 缓存数据
private List<DataExcelEntity> cacheList = new ArrayList<>(BATCH_SIZE);
private final List<DataExcelEntity> cacheList = new ArrayList<>(BATCH_SIZE);
@Autowired
// 注入 Mapper
private final DataExcelMapper dataExcelMapper;
private ImportTaskMapper importTaskMapper;
@Autowired
// 手动事务管理器
private final PlatformTransactionManager transactionManager;
@Resource
private HourlyOutageExcelProcessService hourlyOutageExcelProcessService;
// 构造方法 -- 交给spring容器创建
public DataExcelListener(DataExcelMapper dataExcelMapper, PlatformTransactionManager transactionManager) {
this.dataExcelMapper = dataExcelMapper;
this.transactionManager = transactionManager;
}
// 注入任务
@Setter
private ImportTask task;
private int success = 0;
private int fail = 0;
// 逐行读取不会OOM
@Override
public void invoke(DataExcelEntity data, AnalysisContext context) {
//TODO 待完善 等志明拿到具体数据再导入到数据库中
cacheList.add(data);
// 达到批次数量 插入数据库
if (cacheList.size() >= BATCH_SIZE) {
batchInsert();
if (!Objects.isNull(data)) {
//TODO 待完善 等志明拿到具体数据再导入到数据库中
cacheList.add(data);
// 达到批次数量 插入数据库
if (cacheList.size() >= BATCH_SIZE) {
batchInsert();
}
}
}
@ -64,28 +66,48 @@ public class DataExcelListener extends AnalysisEventListener<DataExcelEntity> {
if (!cacheList.isEmpty()) {
batchInsert();
}
// 更新任务状态
if (fail == 0) {
task.setStatus(ImportTaskStatusEnum.SUCCESS.getCode()); // 全部成功没有失败的
} else {
if (success > 0) {
task.setStatus(ImportTaskStatusEnum.PART_SUCCESS.getCode()); // 部分成功含成功和失败的
} else {
task.setStatus(ImportTaskStatusEnum.FAILED.getCode()); // 部分成功含成功和失败的
}
}
importTaskMapper.update(null, new LambdaUpdateWrapper<ImportTask>()
.set(ImportTask::getStatus, task.getStatus()) // 任务状态
.eq(ImportTask::getTaskNo, task.getTaskNo()));
// bean在单例模式下要初始化值
success = 0;
fail = 0;
task = null;
}
// ==================== 核心分批插入 + 手动事务 ====================
private void batchInsert() {
// 开启事务
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
TransactionStatus status = transactionManager.getTransaction(def);
try {
// 批量插入500条
hourlyOutageExcelProcessService.process(cacheList);
// 提交事务
transactionManager.commit(status);
// 清空缓存极其重要防OOM
cacheList.clear();
success += cacheList.size();
} catch (Exception e) {
// 回滚
transactionManager.rollback(status);
throw new RuntimeException("导入失败,已回滚:" + e.getMessage());
fail += cacheList.size();
log.error("导入失败", e);
} finally {
// 更新进度
updateProgress();
cacheList.clear(); // 清空缓存极其重要防OOM
}
}
// 更新任务进度
private void updateProgress() {
importTaskMapper.update(null, new LambdaUpdateWrapper<ImportTask>()
.set(ImportTask::getSuccessCount, success) // 成功数
.set(ImportTask::getFailCount, fail) // 失败数
.set(ImportTask::getTotal, success + fail) // 总数据量
.eq(ImportTask::getTaskNo, task.getTaskNo()));
}
}

View File

@ -0,0 +1,16 @@
package com.southern.power.grid.service;
import com.southern.power.grid.entity.ImportTask;
import org.springframework.web.multipart.MultipartFile;
/**
* 导入任务Service类
*
* @author: junzhangfm
* @date: 2026/3/16
**/
public interface IImportTaskService {
String importExcel(MultipartFile file);
ImportTask getProgress(String taskNo);
}

View File

@ -0,0 +1,56 @@
package com.southern.power.grid.service.impl;
import com.alibaba.excel.EasyExcel;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.southern.power.grid.dao.ImportTaskMapper;
import com.southern.power.grid.entity.DataExcelEntity;
import com.southern.power.grid.entity.ImportTask;
import com.southern.power.grid.enums.ImportTaskStatusEnum;
import com.southern.power.grid.listener.DataExcelListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.InputStream;
/**
* 异步导入实现类
*
* @author: junzhangfm
* @date: 2026/3/16
**/
@Service
@Slf4j
public class AsyncImportServiceImpl {
@Autowired
private ImportTaskMapper importTaskMapper;
@Autowired
private DataExcelListener dataExcelListener;
@Async("asyncExecutor")
public void doAsyncImport(InputStream inputStream, ImportTask task) {
try {
// 更新为导入中
importTaskMapper.update(null, new LambdaUpdateWrapper<ImportTask>()
.set(ImportTask::getStatus, ImportTaskStatusEnum.PROCESSING.getCode())
.eq(ImportTask::getTaskNo, task.getTaskNo()));
// 注入任务
dataExcelListener.setTask(task);
// 采用easyExcel流式读取一行一行读不加载整个excel文件防止OOM
EasyExcel.read(inputStream, DataExcelEntity.class, dataExcelListener)
.sheet()
.doRead();
} catch (Exception e) {
importTaskMapper.update(null, new LambdaUpdateWrapper<ImportTask>()
.set(ImportTask::getStatus, ImportTaskStatusEnum.FAILED.getCode()) // 失败状态
.set(ImportTask::getFailMsg, e.getMessage()) // 更新原因
.eq(ImportTask::getTaskNo, task.getTaskNo()));
log.error("导入异常", e);
}
}
}

View File

@ -35,11 +35,13 @@ public class DnerDailyPowerOutageEventServiceImpl
Map<String, DnerDailyPowerOutageEvent> dateTimeAndEntityMap = dataList.stream().collect(Collectors.toMap(
DnerDailyPowerOutageEvent::getDataTime, Function.identity(), (k1, k2) -> k1));
List<List<Integer>> kline = new ArrayList<>();
List<Integer> userCounts = new ArrayList<>();
List<Double> cumulativeRain = new ArrayList<>();
List<Double> avgRain = new ArrayList<>();
List<Double> avgTemp = new ArrayList<>();
List<Double> maxTemp = new ArrayList<>();
List<Double> minTemp = new ArrayList<>();
List<Double> windSpeed = new ArrayList<>();
List<Integer> ma5List = new ArrayList<>();
List<Integer> ma10List = new ArrayList<>();
List<Integer> ma20List = new ArrayList<>();
@ -59,6 +61,8 @@ public class DnerDailyPowerOutageEventServiceImpl
ma10List.add(0);
ma20List.add(0);
ma30List.add(0);
windSpeed.add(0.0);
userCounts.add(0);
} else {
List<Integer> klineE = Arrays.asList(event.getStarUserCount(), event.getEndUserCount(),
event.getMinUserCount(), event.getMaxUserCount());
@ -72,6 +76,8 @@ public class DnerDailyPowerOutageEventServiceImpl
ma10List.add(getMa(event, dateTimeAndEntityMap, 10));
ma20List.add(getMa(event, dateTimeAndEntityMap, 20));
ma30List.add(getMa(event, dateTimeAndEntityMap, 30));
windSpeed.add(Double.valueOf(event.getExtremeWindSpeedHourly()));
userCounts.add(event.getUserCount());
}
});
DailyPowerOutageEventVO result = new DailyPowerOutageEventVO();
@ -86,6 +92,8 @@ public class DnerDailyPowerOutageEventServiceImpl
result.setMa10(ma10List);
result.setMa20(ma20List);
result.setMa30(ma30List);
result.setWindSpeed(windSpeed);
result.setUserCounts(userCounts);
return result;
}
@ -98,7 +106,7 @@ public class DnerDailyPowerOutageEventServiceImpl
for (String date : maDateList) {
DnerDailyPowerOutageEvent outageEvent = dateTimeAndEntityMap.get(date);
if (!Objects.isNull(outageEvent)) {
userCount += outageEvent.getUserCount();
userCount += outageEvent.getEndUserCount();
}
}
return userCount/maNum;

View File

@ -0,0 +1,67 @@
package com.southern.power.grid.service.impl;
import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.southern.power.grid.dao.ImportTaskMapper;
import com.southern.power.grid.entity.ImportTask;
import com.southern.power.grid.enums.ImportTaskStatusEnum;
import com.southern.power.grid.service.IImportTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* 导入任务 -- service实现类
*
* @author: junzhangfm
* @date: 2026/3/16
**/
@Service
@Slf4j
public class ImportTaskServiceImpl extends ServiceImpl<ImportTaskMapper, ImportTask>
implements IImportTaskService {
@Autowired
private ImportTaskMapper importTaskMapper;
@Autowired
private AsyncImportServiceImpl asyncImportService;
@Override
public String importExcel(MultipartFile file) {
// 通过hutool工具类生成唯一任务号
String taskNo = IdUtil.fastSimpleUUID();
// 创建任务
ImportTask task = new ImportTask();
task.setTaskNo(taskNo);
task.setStatus(ImportTaskStatusEnum.WAITING.getCode());
task.setTotal(0);
importTaskMapper.insert(task);
// 异步执行导入
// 把上传的文件流转成 **字节数组输入流**保存到内存避免Tomcat删除
InputStream inputStream;
try {
inputStream = new ByteArrayInputStream(file.getBytes());
} catch (IOException e) {
log.error("file exception!");
throw new RuntimeException(e);
}
asyncImportService.doAsyncImport(inputStream, task);
// 立即返回任务ID给前端
return taskNo;
}
@Override
public ImportTask getProgress(String taskNo) {
return importTaskMapper.selectOne(new LambdaQueryWrapper<ImportTask>().eq(ImportTask::getTaskNo, taskNo));
}
}

View File

@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<!-- namespace 必须 = Mapper 接口全类名 -->
<mapper namespace="com.southern.power.grid.dao.DataExcelMapper">
<insert id="batchInsert">
</insert>
</mapper>

View File

@ -1,3 +1,18 @@
-- 导入任务表
CREATE TABLE import_task
(
id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '任务ID',
task_no VARCHAR(64) NOT NULL UNIQUE COMMENT '任务唯一编号',
total INT DEFAULT 0 COMMENT '总数据量',
success_count INT DEFAULT 0 COMMENT '成功数量',
fail_count INT DEFAULT 0 COMMENT '失败数量',
status VARCHAR(20) DEFAULT 'WAITING' COMMENT 'WAITING/PROCESSING/SUCCESS/FAILED/COMPLETED',
fail_msg TEXT COMMENT '失败原因',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT '导入任务表';
-- 行政区划配置表
CREATE TABLE `dner_site_area_configuration`
(