refactor(日K图同步): 移除setDate参数并自动计算同步状态

- 删除processingData接口和控制器中的setDate参数,改为根据小时数据是否存在23时记录自动判断同步状态
- 在Excel导入完成后自动触发日K图数据同步
- 优化开始日期和结束日期的获取逻辑,改为基于事件ID从小时表查询
- 简化存量数据删除逻辑,不再检查同步记录直接删除
This commit is contained in:
yufengshuo 2026-03-27 11:07:14 +08:00
parent 8fda70baf9
commit a4c40ff3ce
5 changed files with 108 additions and 62 deletions

View File

@ -132,11 +132,11 @@ public class DnerController {
* *
* @param startDate 开始日期(数据示例2025-09-01) * @param startDate 开始日期(数据示例2025-09-01)
* @param endDate 结束日期(数据示例2025-09-01) * @param endDate 结束日期(数据示例2025-09-01)
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置(数据示例2025-09-01) * @param eventId 事件Id
* @return 结果 <布尔> * @return 结果 <布尔>
*/ */
@GetMapping("/sync/dailyPowerOutageEvent") @GetMapping("/sync/dailyPowerOutageEvent")
public Result<Boolean> syncDailyPowerOutageEvent(@RequestParam String startDate, @RequestParam String endDate, @RequestParam String setDate,@RequestParam Long eventId) { public Result<Boolean> syncDailyPowerOutageEvent(@RequestParam String startDate, @RequestParam String endDate, @RequestParam Long eventId) {
return Result.success(dnerDailyPowerOutageEventSyncService.processingData(startDate, endDate, setDate,eventId)); return Result.success(dnerDailyPowerOutageEventSyncService.processingData(startDate, endDate,eventId));
} }
} }

View File

@ -8,6 +8,7 @@ import com.southern.power.grid.dao.ImportTaskMapper;
import com.southern.power.grid.entity.DataExcelEntity; import com.southern.power.grid.entity.DataExcelEntity;
import com.southern.power.grid.entity.ImportTask; import com.southern.power.grid.entity.ImportTask;
import com.southern.power.grid.enums.ImportTaskStatusEnum; import com.southern.power.grid.enums.ImportTaskStatusEnum;
import com.southern.power.grid.service.IDnerDailyPowerOutageEventSyncService;
import com.southern.power.grid.service.impl.HourlyOutageExcelProcessService; import com.southern.power.grid.service.impl.HourlyOutageExcelProcessService;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -44,6 +45,9 @@ public class DataExcelListener extends AnalysisEventListener<DataExcelEntity> {
@Autowired @Autowired
private HourlyOutageExcelProcessService hourlyOutageExcelProcessService; private HourlyOutageExcelProcessService hourlyOutageExcelProcessService;
@Autowired
private IDnerDailyPowerOutageEventSyncService dailyPowerOutageEventSycnService;
// 注入任务 // 注入任务
@Setter @Setter
private ImportTask task; private ImportTask task;
@ -70,6 +74,10 @@ public class DataExcelListener extends AnalysisEventListener<DataExcelEntity> {
if (!cacheList.isEmpty()) { if (!cacheList.isEmpty()) {
batchInsert(); batchInsert();
} }
// 同步日K图数据
dailyPowerOutageEventSycnService.syncDnerDailyPowerOutageEvent(task.getEventId());
// 更新任务状态 // 更新任务状态
if (fail == 0) { if (fail == 0) {
task.setStatus(ImportTaskStatusEnum.SUCCESS.getCode()); // 全部成功没有失败的 task.setStatus(ImportTaskStatusEnum.SUCCESS.getCode()); // 全部成功没有失败的
@ -127,7 +135,8 @@ public class DataExcelListener extends AnalysisEventListener<DataExcelEntity> {
} }
@Override @Override
public void doAfterAllAnalysed(AnalysisContext context) {} public void doAfterAllAnalysed(AnalysisContext context) {
}
}).sheet().doRead(); }).sheet().doRead();
return totalCount.get(); return totalCount.get();

View File

@ -15,9 +15,8 @@ public interface IDnerDailyPowerOutageEventSyncService {
* *
* @param startDate 开始日期 * @param startDate 开始日期
* @param endDate 结束日期 * @param endDate 结束日期
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @param eventId 事件id * @param eventId 事件id
* @return 布尔型 * @return 布尔型
*/ */
Boolean processingData(String startDate, String endDate, String setDate, Long eventId); Boolean processingData(String startDate, String endDate, Long eventId);
} }

View File

@ -14,6 +14,8 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -45,7 +47,7 @@ public class DnerDailyPowerOutageEventBatchService {
*/ */
public void deleteDateData(String date, Long eventId) { public void deleteDateData(String date, Long eventId) {
try { try {
log.info("开始删除日期 {} 的存量数据", date); log.info("开始删除日期 {} 事件ID {} 的存量数据", date, eventId);
// 1. 删除dner_daily_power_outage_event表中对应日期的数据 // 1. 删除dner_daily_power_outage_event表中对应日期的数据
int deleteDailyCount = dailyPowerOutageEventMapper.delete( int deleteDailyCount = dailyPowerOutageEventMapper.delete(
@ -53,7 +55,7 @@ public class DnerDailyPowerOutageEventBatchService {
.eq(DnerDailyPowerOutageEvent::getEventId, eventId) .eq(DnerDailyPowerOutageEvent::getEventId, eventId)
.eq(DnerDailyPowerOutageEvent::getDataTime, date) .eq(DnerDailyPowerOutageEvent::getDataTime, date)
); );
log.info("日期 {} 删除日表 {} 条存量数据", date, deleteDailyCount); log.info("日期 {} 事件ID {} 删除日表 {} 条存量数据", date, eventId, deleteDailyCount);
// 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录 // 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录
int deleteSyncCount = syncMapper.delete( int deleteSyncCount = syncMapper.delete(
@ -61,16 +63,16 @@ public class DnerDailyPowerOutageEventBatchService {
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId) .eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
.eq(DnerDailyPowerOutageEventSync::getDateTime, date) .eq(DnerDailyPowerOutageEventSync::getDateTime, date)
); );
log.info("日期 {} 删除同步记录表 {} 条存量数据", date, deleteSyncCount); log.info("日期 {} 事件ID {} 删除同步记录表 {} 条存量数据", date, eventId, deleteSyncCount);
} catch (Exception e) { } catch (Exception e) {
log.error("日期 {} 删除存量数据失败: {}", date, e.getMessage(), e); log.error("日期 {} 事件ID {} 删除存量数据失败: {}", date, eventId, e.getMessage(), e);
throw new RuntimeException("删除日期 " + date + " 存量数据失败: " + e.getMessage(), e); throw new RuntimeException("删除日期 " + date + "事件ID " + eventId + " 存量数据失败: " + e.getMessage(), e);
} }
} }
/** /**
* 同步指定日期的一批区域数据带事务 * 同步指定日期的一批区域数据
* *
* @param date 日期 * @param date 日期
* @param orgCodeList 区域编码列表批次 * @param orgCodeList 区域编码列表批次
@ -92,13 +94,10 @@ public class DnerDailyPowerOutageEventBatchService {
for (String orgCode : orgCodeList) { for (String orgCode : orgCodeList) {
try { try {
List<DnerHourlyPowerOutageEvent> eventList = orgCodeToHourlyEvents.get(orgCode); List<DnerHourlyPowerOutageEvent> eventList = orgCodeToHourlyEvents.get(orgCode);
if (CollectionUtils.isEmpty(eventList)) { if (CollectionUtils.isEmpty(eventList)) {
log.warn("日期 {} 区域 {} 无小时数据", date, orgCode); log.warn("日期 {} 区域 {} 无小时数据", date, orgCode);
continue; continue;
} }
// 2. 转换为日数据
DnerDailyPowerOutageEvent dailyPowerOutageEvent = convertToDailyEvent(eventList, date, orgCode); DnerDailyPowerOutageEvent dailyPowerOutageEvent = convertToDailyEvent(eventList, date, orgCode);
if (ObjectUtils.isEmpty(dailyPowerOutageEvent)) { if (ObjectUtils.isEmpty(dailyPowerOutageEvent)) {
log.warn("转换日K线数据失败日期{}, 区域编号:{}", date, orgCode); log.warn("转换日K线数据失败日期{}, 区域编号:{}", date, orgCode);
@ -143,16 +142,14 @@ public class DnerDailyPowerOutageEventBatchService {
* *
* @param date 同步日期 * @param date 同步日期
* @param errors 错误信息列表 * @param errors 错误信息列表
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @param eventId 事件id * @param eventId 事件id
*/ */
public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors, String setDate, Long eventId) { public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors, Long eventId) {
try { try {
DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync(); DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
record.setDateTime(date); record.setDateTime(date);
record.setEventId(eventId); record.setEventId(eventId);
// 判断同步状态 int syncStatus = getSyncStatus(date, eventId);
int syncStatus = setDate != null && setDate.equals(date) ? 2 : 1;
// 记录错误信息 // 记录错误信息
String errorMsg = null; String errorMsg = null;
@ -174,6 +171,24 @@ public class DnerDailyPowerOutageEventBatchService {
} }
} }
private int getSyncStatus(String date, Long eventId) {
// 根据date和eventId 查看分时表数据是否为全部数据(整时存在23时数据)
int syncStatus = 1;
DnerHourlyPowerOutageEvent event = hourlyPowerOutageEventMapper.selectOne(new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
.eq(DnerHourlyPowerOutageEvent::getEventId, eventId)
.likeRight(DnerHourlyPowerOutageEvent::getDataTime, date)
.orderByDesc(DnerHourlyPowerOutageEvent::getDataTime)
.last("LIMIT 1"));
int hour = LocalDateTime.parse(
event.getDataTime(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
).getHour();
if (23 != hour) {
syncStatus = 2;
}
return syncStatus;
}
/** /**
* 根据日期查询小时数据 * 根据日期查询小时数据
*/ */
@ -214,7 +229,6 @@ public class DnerDailyPowerOutageEventBatchService {
if (list == null || list.isEmpty()) { if (list == null || list.isEmpty()) {
return null; return null;
} }
try { try {
DnerDailyPowerOutageEvent daily = new DnerDailyPowerOutageEvent(); DnerDailyPowerOutageEvent daily = new DnerDailyPowerOutageEvent();
daily.setOrgCode(orgCode); daily.setOrgCode(orgCode);

View File

@ -1,6 +1,7 @@
package com.southern.power.grid.service.impl; package com.southern.power.grid.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.southern.power.grid.config.BusinessThreadPoolFactory; import com.southern.power.grid.config.BusinessThreadPoolFactory;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper; import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper;
@ -13,12 +14,16 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.*; import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -49,9 +54,20 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
@Override @Override
public Boolean syncDnerDailyPowerOutageEvent(Long eventId) { public Boolean syncDnerDailyPowerOutageEvent(Long eventId) {
String startDate = getStartDate(); String startDate = getStartDate(eventId);
String endDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); String endDate = getEndDate(eventId);
return processingData(startDate, endDate, endDate, eventId); return processingData(startDate, endDate, eventId);
}
private String getEndDate(Long eventId) {
QueryWrapper<DnerHourlyPowerOutageEvent> wrapper = new QueryWrapper<>();
wrapper.eq("event_id", eventId)
.select("MAX(data_time) AS maxDataTime");
return hourlyPowerOutageEventMapper.selectMaps(wrapper).stream()
.findFirst()
.map(map -> formatToDate(map.get("maxDataTime")))
.orElse(null);
} }
/** /**
@ -61,8 +77,15 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
* @return 同步结果 * @return 同步结果
*/ */
@Override @Override
public Boolean processingData(String startDate, String endDate, String setDate, Long eventId) { public Boolean processingData(String startDate, String endDate, Long eventId) {
log.info("================== 开始日K线停电事件同步任务 =================="); log.info("================== 开始日K线停电事件同步任务 ==================");
Assert.notNull(eventId, "事件ID字段不能为空");
if (!StringUtils.hasLength(startDate)) {
startDate = getStartDate(eventId);
}
if (!StringUtils.hasLength(endDate)) {
endDate = getEndDate(eventId);
}
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
@ -89,25 +112,13 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
boolean allSuccess = true; boolean allSuccess = true;
for (String date : allDates) { for (String date : allDates) {
try { try {
log.info("开始处理日期:{}", date); log.info("开始处理日期:{} 事件ID:{}", date, eventId);
// 先删除startDate的旧数据 // 删除旧数据
if (date.equals(startDate)) { batchService.deleteDateData(date, eventId);
log.info("日期 {} 存在旧数据,先删除", date);
batchService.deleteDateData(date, eventId);
}
// 检查是否已存在成功同步记录
Long recordByDate = getLatestSyncRecordByDate(date, eventId);
// 非今日且已同步成功则跳过
if (recordByDate != 0) {
log.info("日期 {} 已存在成功同步记录,跳过处理", date);
continue;
}
// 处理该日期的数据 // 处理该日期的数据
boolean success = processDateData(executor, date, orgCodeList, setDate, eventId); boolean success = processDateData(executor, date, orgCodeList, eventId);
if (success) { if (success) {
log.info("日期 {} 处理完成", date); log.info("日期 {} 处理完成", date);
@ -141,11 +152,10 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
* @param executor 线程池 * @param executor 线程池
* @param date 日期 * @param date 日期
* @param orgCodeList 所有区域编码列表 * @param orgCodeList 所有区域编码列表
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @param eventId 事件id * @param eventId 事件id
* @return 处理结果 * @return 处理结果
*/ */
private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List<String> orgCodeList, String setDate, Long eventId) { private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List<String> orgCodeList, Long eventId) {
List<List<String>> orgCodeBatches = partitionList(orgCodeList, ORG_BATCH_SIZE); List<List<String>> orgCodeBatches = partitionList(orgCodeList, ORG_BATCH_SIZE);
log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size()); log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size());
log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor)); log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor));
@ -217,7 +227,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
date, null, "同步超时")); date, null, "同步超时"));
} }
batchService.saveDailySyncRecord(date, allErrors, setDate, eventId); batchService.saveDailySyncRecord(date, allErrors, eventId);
return allErrors.isEmpty(); return allErrors.isEmpty();
} }
@ -227,26 +237,40 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
* 默认开始日期为当前日期往前8天 * 默认开始日期为当前日期往前8天
* 如果存在部分同步的日期则从该日期开始 * 如果存在部分同步的日期则从该日期开始
*/ */
private String getStartDate() { private String getStartDate(Long eventId) {
try { QueryWrapper<DnerHourlyPowerOutageEvent> wrapper = new QueryWrapper<>();
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd"); wrapper.eq("event_id", eventId)
LocalDate fallback = LocalDate.now().minusDays(8); .select("MIN(data_time) AS minDataTime");
DnerDailyPowerOutageEventSync eventSync = this.lambdaQuery() return hourlyPowerOutageEventMapper.selectMaps(wrapper).stream()
.eq(DnerDailyPowerOutageEventSync::getSyncStatus, 2) .findFirst()
.orderByAsc(DnerDailyPowerOutageEventSync::getDateTime) .map(map -> formatToDate(map.get("minDataTime")))
.last("limit 1") .orElse(null);
.one(); }
if (!ObjectUtils.isEmpty(eventSync)) {
LocalDate candidate = LocalDate.parse(eventSync.getDateTime(), f);
return candidate.format(f);
}
return fallback.format(f); private String formatToDate(Object value) {
} catch (Exception e) { if (value == null) {
log.error("获取开始日期失败: {}", e.getMessage(), e); return null;
return LocalDate.now().minusDays(8).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
} }
if (value instanceof LocalDate) {
return ((LocalDate) value).format(DateTimeFormatter.ISO_LOCAL_DATE);
}
if (value instanceof LocalDateTime) {
return ((LocalDateTime) value).toLocalDate().format(DateTimeFormatter.ISO_LOCAL_DATE);
}
String text = value.toString();
if (!StringUtils.hasLength(text)) {
return null;
}
if (text.length() >= 10) {
String datePart = text.substring(0, 10);
try {
return LocalDate.parse(datePart, DateTimeFormatter.ISO_LOCAL_DATE).format(DateTimeFormatter.ISO_LOCAL_DATE);
} catch (DateTimeParseException ignored) {
return datePart;
}
}
return text;
} }