feat: 支持多事件并行处理并增强停电用户统计

- 在DnerDailyPowerOutageEventSync实体和同步记录中新增eventId字段,支持多事件并行处理
- 为DataExcelEntity和DnerHourlyPowerOutageEvent添加故障停电和计划停电用户数统计字段
- 修改所有相关查询、同步和数据处理方法,增加eventId参数以支持按事件隔离数据
- 优化日K线数据生成逻辑,分别统计故障停电和计划停电的用户数(总数、最大、最小、起始、结束值)
- 修复文件上传时同步记录清理不完整的问题,确保删除旧的K线图同步记录
This commit is contained in:
yufengshuo 2026-03-26 16:15:43 +08:00
parent ca8670386d
commit b49bab1ccc
10 changed files with 111 additions and 50 deletions

View File

@ -92,7 +92,7 @@ public class DnerController {
*/
@PostMapping("/excel/import")
public Result<String> importExcel(@RequestParam("file") MultipartFile file,
@RequestParam Long eventId) {
@RequestParam("eventId") Long eventId) {
return Result.success(importTaskService.importExcel(file, eventId));
}
@ -136,7 +136,7 @@ public class DnerController {
* @return 结果 <布尔>
*/
@GetMapping("/sync/dailyPowerOutageEvent")
public Result<Boolean> syncDailyPowerOutageEvent(@RequestParam String startDate, @RequestParam String endDate, @RequestParam String setDate) {
return Result.success(dnerDailyPowerOutageEventSyncService.processingData(startDate, endDate, setDate));
public Result<Boolean> syncDailyPowerOutageEvent(@RequestParam String startDate, @RequestParam String endDate, @RequestParam String setDate,@RequestParam Long eventId) {
return Result.success(dnerDailyPowerOutageEventSyncService.processingData(startDate, endDate, setDate,eventId));
}
}

View File

@ -36,7 +36,7 @@ public interface DnerHourlyPowerOutageEventMapper extends BaseMapper<DnerHourlyP
List<DnerHourlyPowerOutageEvent> selectByOrgCodesAndDataTime(@Param("orgCodes") List<String> orgCodes,
@Param("startDate") String startDate,
@Param("endDate") String endDate);
@Param("endDate") String endDate, @Param("eventId") Long eventId);
/**
* 查询 outageState=2 且按 orgCode+日期分组后最小 dataTime 早于指定时间的分组列表

View File

@ -37,7 +37,7 @@ public class DataExcelEntity {
private LocalDateTime endTime;
@ExcelProperty(value = "停电时长(mins)")
private Float lengthOutage ;
private Float lengthOutage;
@ExcelProperty(value = "停电影响用户数")
private Integer userCount;
@ -45,4 +45,16 @@ public class DataExcelEntity {
@DateTimeFormat("yyyy-MM-dd HH:mm:ss")
@ExcelProperty(value = "数据创建时间")
private LocalDateTime createTime;
/**
* 故障停电影响用户总数(导入的excel不存在用于后续数据处理)
*/
@ExcelProperty(value = "故障停电影响用户总数")
private Integer faultUserCount;
/**
* 计划停电影响用户数(导入的excel不存在用于后续数据处理)
*/
@ExcelProperty(value = "计划停电影响用户数")
private Integer scheduledUserCount;
}

View File

@ -22,6 +22,11 @@ public class DnerDailyPowerOutageEventSync {
*/
private String dateTime;
/**
* 事件ID
*/
private Long eventId;
/**
* 同步状态 1 非当日调用 2 当日调用
*/

View File

@ -5,9 +5,10 @@ public interface IDnerDailyPowerOutageEventSyncService {
/**
* 日K线停电事件同步
*
* @param eventId 事件id
* @return 布尔型
*/
Boolean syncDnerDailyPowerOutageEvent();
Boolean syncDnerDailyPowerOutageEvent(Long eventId);
/**
* 处理数据
@ -15,7 +16,8 @@ public interface IDnerDailyPowerOutageEventSyncService {
* @param startDate 开始日期
* @param endDate 结束日期
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @param eventId 事件id
* @return 布尔型
*/
Boolean processingData(String startDate, String endDate, String setDate);
Boolean processingData(String startDate, String endDate, String setDate, Long eventId);
}

View File

@ -40,22 +40,25 @@ public class DnerDailyPowerOutageEventBatchService {
* 删除指定日期的存量数据
* 删除dner_daily_power_outage_event(对应data_time字段)和dner_daily_power_outage_event_sync(对应date_time字段)中的对应数据
*
* @param date 日期
* @param date 日期
* @param eventId 事件id
*/
public void deleteDateData(String date) {
public void deleteDateData(String date, Long eventId) {
try {
log.info("开始删除日期 {} 的存量数据", date);
// 1. 删除dner_daily_power_outage_event表中对应日期的数据
int deleteDailyCount = dailyPowerOutageEventMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEvent>()
.eq(DnerDailyPowerOutageEvent::getDataTime,date)
.eq(DnerDailyPowerOutageEvent::getEventId, eventId)
.eq(DnerDailyPowerOutageEvent::getDataTime, date)
);
log.info("日期 {} 删除日表 {} 条存量数据", date, deleteDailyCount);
// 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录
int deleteSyncCount = syncMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEventSync>()
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
);
log.info("日期 {} 删除同步记录表 {} 条存量数据", date, deleteSyncCount);
@ -69,16 +72,17 @@ public class DnerDailyPowerOutageEventBatchService {
/**
* 同步指定日期的一批区域数据带事务
*
* @param date 日期
* @param orgCodeList 区域编码列表批次
* @param batchIndex 批次索引
* @param date 日期
* @param orgCodeList 区域编码列表批次
* @param batchIndex 批次索引
* @param eventId 事件id
*/
public void syncDateDataForBatch(String date, List<String> orgCodeList, int batchIndex) {
public void syncDateDataForBatch(String date, List<String> orgCodeList, int batchIndex, Long eventId) {
log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size());
List<DnerDailyPowerOutageEvent> allData = new ArrayList<>();
List<DnerHourlyPowerOutageEvent> allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date);
List<DnerHourlyPowerOutageEvent> allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date, eventId);
if (CollectionUtils.isEmpty(allHourlyEvents)) {
log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex);
return;
@ -100,14 +104,7 @@ public class DnerDailyPowerOutageEventBatchService {
log.warn("转换日K线数据失败日期{}, 区域编号:{}", date, orgCode);
continue;
}
// 3. 保存两条停电状态不同的数据停电中=2已复电=3
DnerDailyPowerOutageEvent copyEvent = deepCopyByJackson(dailyPowerOutageEvent, DnerDailyPowerOutageEvent.class);
dailyPowerOutageEvent.setOutageState(2);
copyEvent.setOutageState(3);
allData.add(dailyPowerOutageEvent);
allData.add(copyEvent);
} catch (Exception e) {
log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e);
@ -122,14 +119,14 @@ public class DnerDailyPowerOutageEventBatchService {
for (int i = 0; i < allData.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, allData.size());
List<DnerDailyPowerOutageEvent> subList = allData.subList(i, end);
for (DnerDailyPowerOutageEvent event : subList) {
dailyPowerOutageEventMapper.insert(event);
}
totalInserted += subList.size();
log.debug("日期 {} 批次 {} 已插入 {}/{} 条记录", date, batchIndex, totalInserted, allData.size());
}
log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted);
} catch (Exception e) {
log.error("日期 {} 批次 {} 保存数据时发生异常:{}", date, batchIndex, e.getMessage(), e);
@ -144,15 +141,16 @@ public class DnerDailyPowerOutageEventBatchService {
* 保存每日同步记录
* 一个日期的所有数据同步完成后保存一条同步记录
*
* @param date 同步日期
* @param errors 错误信息列表
* @param date 同步日期
* @param errors 错误信息列表
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @param eventId 事件id
*/
public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors,String setDate) {
public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors, String setDate, Long eventId) {
try {
DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
record.setDateTime(date);
record.setEventId(eventId);
// 判断同步状态
int syncStatus = setDate != null && setDate.equals(date) ? 2 : 1;
@ -195,14 +193,14 @@ public class DnerDailyPowerOutageEventBatchService {
}
}
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDateAndOrgCodes(List<String> orgCodes, String date) {
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDateAndOrgCodes(List<String> orgCodes, String date, Long eventId) {
if (CollectionUtils.isEmpty(orgCodes)) {
return new ArrayList<>();
}
try {
String start = date + " 00:00:00";
String end = date + " 23:59:59";
return hourlyPowerOutageEventMapper.selectByOrgCodesAndDataTime(orgCodes,start,end);
return hourlyPowerOutageEventMapper.selectByOrgCodesAndDataTime(orgCodes, start, end, eventId);
} catch (Exception e) {
log.error("批量查询小时数据失败 - 日期: {}, 区域数: {}, 错误: {}", date, orgCodes.size(), e.getMessage(), e);
throw new RuntimeException("批量查询小时数据失败: " + e.getMessage(), e);
@ -221,16 +219,39 @@ public class DnerDailyPowerOutageEventBatchService {
DnerDailyPowerOutageEvent daily = new DnerDailyPowerOutageEvent();
daily.setOrgCode(orgCode);
daily.setDataTime(date);
daily.setEventId(list.get(0).getEventId());
// 用户数统计
//故障停电影响用户总数
IntSummaryStatistics faultStats = list.stream()
.mapToInt(e -> e.getFaultUserCount() == null ? 0 : e.getFaultUserCount())
.summaryStatistics();
daily.setFaultUserCount((int) faultStats.getSum());
daily.setFaultMaxUserCount(faultStats.getMax());
daily.setFaultMinUserCount(faultStats.getMin());
daily.setFaultStarUserCount(list.stream().min(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getFaultUserCount).orElse(0));
daily.setFaultEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getFaultUserCount).orElse(0));
//计划停电影响用户数
IntSummaryStatistics scheduledStats = list.stream()
.mapToInt(e -> e.getScheduledUserCount() == null ? 0 : e.getScheduledUserCount())
.summaryStatistics();
daily.setScheduledUserCount((int) scheduledStats.getSum());
daily.setScheduledMaxUserCount(scheduledStats.getMax());
daily.setScheduledMinUserCount(scheduledStats.getMin());
daily.setScheduledStarUserCount(list.stream().min(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getScheduledUserCount).orElse(0));
daily.setScheduledEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getScheduledUserCount).orElse(0));
// 停电影响用户总数
IntSummaryStatistics userStats = list.stream()
.mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount())
.summaryStatistics();
daily.setUserCount((int) userStats.getSum());
daily.setMaxUserCount(userStats.getMax());
daily.setMinUserCount(userStats.getMin());
// 起始和结束用户数
daily.setStarUserCount(list.stream().min(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0));
daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))

View File

@ -48,10 +48,10 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
private static final int ORG_BATCH_SIZE = 100;
@Override
public Boolean syncDnerDailyPowerOutageEvent() {
public Boolean syncDnerDailyPowerOutageEvent(Long eventId) {
String startDate = getStartDate();
String endDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
return processingData(startDate, endDate, endDate);
return processingData(startDate, endDate, endDate, eventId);
}
/**
@ -61,7 +61,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
* @return 同步结果
*/
@Override
public Boolean processingData(String startDate, String endDate, String setDate) {
public Boolean processingData(String startDate, String endDate, String setDate, Long eventId) {
log.info("================== 开始日K线停电事件同步任务 ==================");
long startTime = System.currentTimeMillis();
@ -94,11 +94,11 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
// 先删除startDate的旧数据
if (date.equals(startDate)) {
log.info("日期 {} 存在旧数据,先删除", date);
batchService.deleteDateData(date);
batchService.deleteDateData(date, eventId);
}
// 检查是否已存在成功同步记录
Long recordByDate = getLatestSyncRecordByDate(date);
Long recordByDate = getLatestSyncRecordByDate(date, eventId);
// 非今日且已同步成功则跳过
if (recordByDate != 0) {
@ -107,7 +107,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
}
// 处理该日期的数据
boolean success = processDateData(executor, date, orgCodeList, setDate);
boolean success = processDateData(executor, date, orgCodeList, setDate, eventId);
if (success) {
log.info("日期 {} 处理完成", date);
@ -142,9 +142,10 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
* @param date 日期
* @param orgCodeList 所有区域编码列表
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @param eventId 事件id
* @return 处理结果
*/
private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List<String> orgCodeList, String setDate) {
private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List<String> orgCodeList, String setDate, Long eventId) {
List<List<String>> orgCodeBatches = partitionList(orgCodeList, ORG_BATCH_SIZE);
log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size());
log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor));
@ -160,7 +161,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> future = executor.submit(() -> {
try {
log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size());
batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex);
batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex, eventId);
return null;
} catch (Exception e) {
log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e);
@ -216,7 +217,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
date, null, "同步超时"));
}
batchService.saveDailySyncRecord(date, allErrors, setDate);
batchService.saveDailySyncRecord(date, allErrors, setDate, eventId);
return allErrors.isEmpty();
}
@ -249,10 +250,11 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
}
private Long getLatestSyncRecordByDate(String date) {
private Long getLatestSyncRecordByDate(String date, Long eventId) {
try {
return this.lambdaQuery()
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
.count();
} catch (Exception e) {
log.warn("查询日期 {} 同步记录失败: {}", date, e.getMessage(), e);

View File

@ -3,9 +3,11 @@ package com.southern.power.grid.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper;
import com.southern.power.grid.dao.DnerEventAttachmentMapper;
import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper;
import com.southern.power.grid.entity.DnerDailyPowerOutageEvent;
import com.southern.power.grid.entity.DnerDailyPowerOutageEventSync;
import com.southern.power.grid.entity.DnerEventAttachment;
import com.southern.power.grid.entity.DnerHourlyPowerOutageEvent;
import com.southern.power.grid.service.IFileService;
@ -42,6 +44,9 @@ public class FileServiceImpl implements IFileService {
@Autowired
private DnerDailyPowerOutageEventMapper dnerDailyPowerOutageEventMapper;
@Autowired
private DnerDailyPowerOutageEventSyncMapper dnerDailyPowerOutageEventSyncMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public Long uploadExcel(MultipartFile file, Long eventId) throws IOException {
@ -78,8 +83,11 @@ public class FileServiceImpl implements IFileService {
// 删除旧的分时图和K线图数据
dnerHourlyPowerOutageEventMapper.delete(new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
.eq(DnerHourlyPowerOutageEvent::getEventId, eventId));
dnerDailyPowerOutageEventMapper.delete(new LambdaQueryWrapper<DnerDailyPowerOutageEvent>()
.eq(DnerDailyPowerOutageEvent::getEventId, eventId));
dnerDailyPowerOutageEventMapper.delete(new LambdaQueryWrapper<DnerDailyPowerOutageEvent>().eq(DnerDailyPowerOutageEvent::getEventId,eventId));
//删除旧的K线图数据的同步记录
dnerDailyPowerOutageEventSyncMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEventSync>().eq(DnerDailyPowerOutageEventSync::getEventId, eventId));
return record.getId();
}
}

View File

@ -146,6 +146,8 @@ public class HourlyOutageExcelProcessService {
*/
private void accumulateEvent(DnerHourlyPowerOutageEvent existing, DataExcelEntity row) {
existing.setUserCount(row.getUserCount() + existing.getUserCount());
existing.setScheduledUserCount(row.getScheduledUserCount() + existing.getScheduledUserCount());
existing.setFaultUserCount(row.getFaultUserCount() + existing.getFaultUserCount());
}
/**
@ -203,6 +205,8 @@ public class HourlyOutageExcelProcessService {
RegionalWeatherData regionalWeatherData) {
DnerHourlyPowerOutageEvent event = new DnerHourlyPowerOutageEvent();
event.setOrgCode(districtCode);
event.setFaultUserCount(0);
event.setScheduledUserCount(0);
// 时间转成字符串入库到 data_time
if (row.getStartTime() != null) {
@ -228,8 +232,10 @@ public class HourlyOutageExcelProcessService {
String outageTypeStr = row.getOutageType();
if ("故障类".equals(outageTypeStr)) {
event.setOutageType("1");
event.setFaultUserCount(event.getUserCount());
} else if ("计划类".equals(outageTypeStr)) {
event.setOutageType("2");
event.setScheduledUserCount(event.getUserCount());
} else {
event.setOutageType(outageTypeStr); // 保留原值或根据需要设置默认值
}

View File

@ -19,14 +19,15 @@
<insert id="batchInsert" parameterType="list" useGeneratedKeys="true" keyProperty="id">
INSERT INTO dner_hourly_power_outage_event (
org_code, data_time, hourly_precipitation, daily_precipitation,
org_code,event_id, data_time, hourly_precipitation, daily_precipitation,
temperature, hourly_max_temperature, hourly_min_temperature,
extreme_wind_speed_hourly, user_count, outage_state, outage_type,
extreme_wind_speed_hourly, user_count,fault_user_count,scheduled_user_count, outage_state, outage_type,
create_by, create_time, update_by, update_time
) VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.orgCode},
#{item.eventId},
#{item.dataTime},
#{item.hourlyPrecipitation},
#{item.dailyPrecipitation},
@ -35,6 +36,8 @@
#{item.hourlyMinTemperature},
#{item.extremeWindSpeedHourly},
#{item.userCount},
#{item.faultUserCount},
#{item.scheduledUserCount},
#{item.outageState},
#{item.outageType},
#{item.createBy},
@ -54,13 +57,15 @@
</select>
<select id="selectByOrgCodesAndDataTime" resultType="com.southern.power.grid.entity.DnerHourlyPowerOutageEvent">
select id, org_code, data_time, hourly_precipitation, daily_precipitation, temperature, hourly_max_temperature,
hourly_min_temperature, extreme_wind_speed_hourly, user_count, outage_state, outage_type from
select id, org_code, data_time,event_id, hourly_precipitation, daily_precipitation, temperature,
hourly_max_temperature,
hourly_min_temperature, extreme_wind_speed_hourly, user_count,fault_user_count,scheduled_user_count from
dner_hourly_power_outage_event
WHERE org_code IN
<foreach collection="orgCodes" item="code" open="(" separator="," close=")">
#{code}
</foreach>
and event_id = #{eventId}
and data_time <![CDATA[ >= ]]> #{startDate}
and data_time <![CDATA[ <= ]]> #{endDate}
</select>