# Conflicts:
#	src/main/resources/mapper/DnerDailyPowerOutageEventSyncMapper.xml
This commit is contained in:
junzhangfm 2026-03-27 16:48:36 +08:00
commit ba3335c532
5 changed files with 58 additions and 191 deletions

View File

@ -33,7 +33,7 @@ public class DnerDailyPowerOutageEventSync {
private Long eventId; private Long eventId;
/** /**
* 同步状态 1 非当日调用 2 当日调用 * 同步状态 1 同步成功 2 同步失败
*/ */
private Integer syncStatus; private Integer syncStatus;

View File

@ -1,7 +1,6 @@
package com.southern.power.grid.service.impl; package com.southern.power.grid.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper; import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper; import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper;
import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper; import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper;
@ -14,14 +13,11 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 日K线停电事件批量处理服务 * 日K线停电事件批量处理服务
* 专门处理事务性的批量同步操作解决@Transactional自调用问题
* *
* @author: System * @author: System
* @date: 2026/3/17 * @date: 2026/3/17
@ -34,37 +30,27 @@ public class DnerDailyPowerOutageEventBatchService {
private final DnerDailyPowerOutageEventMapper dailyPowerOutageEventMapper; private final DnerDailyPowerOutageEventMapper dailyPowerOutageEventMapper;
private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper; private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper;
private final DnerDailyPowerOutageEventSyncMapper syncMapper; private final DnerDailyPowerOutageEventSyncMapper syncMapper;
private final ObjectMapper mapper = new ObjectMapper();
private static final int BATCH_SIZE = 600; private static final int BATCH_SIZE = 600;
/** /**
* 删除指定日期的存量数据 * 删除指定日期的存量数据
* 删除dner_daily_power_outage_event(对应data_time字段)和dner_daily_power_outage_event_sync(对应date_time字段)中的对应数据
*
* @param date 日期
* @param eventId 事件id
*/ */
public void deleteDateData(String date, Long eventId) { public void deleteDateData(String date, Long eventId) {
try { try {
log.info("开始删除日期 {} 事件ID {} 的存量数据", date, eventId); log.info("开始删除日期 {} 事件ID {} 的存量数据", date, eventId);
// 1. 删除dner_daily_power_outage_event表中对应日期的数据
int deleteDailyCount = dailyPowerOutageEventMapper.delete( int deleteDailyCount = dailyPowerOutageEventMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEvent>() new LambdaQueryWrapper<DnerDailyPowerOutageEvent>()
.eq(DnerDailyPowerOutageEvent::getEventId, eventId) .eq(DnerDailyPowerOutageEvent::getEventId, eventId)
.eq(DnerDailyPowerOutageEvent::getDataTime, date) .eq(DnerDailyPowerOutageEvent::getDataTime, date)
); );
log.info("日期 {} 事件ID {} 删除日表 {} 条存量数据", date, eventId, deleteDailyCount); log.info("日期 {} 事件ID {} 删除日表 {} 条存量数据", date, eventId, deleteDailyCount);
// 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录
int deleteSyncCount = syncMapper.delete( int deleteSyncCount = syncMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEventSync>() new LambdaQueryWrapper<DnerDailyPowerOutageEventSync>()
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId) .eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
.eq(DnerDailyPowerOutageEventSync::getDateTime, date) .eq(DnerDailyPowerOutageEventSync::getDateTime, date)
); );
log.info("日期 {} 事件ID {} 删除同步记录表 {} 条存量数据", date, eventId, deleteSyncCount); log.info("日期 {} 事件ID {} 删除同步记录表 {} 条存量数据", date, eventId, deleteSyncCount);
} catch (Exception e) { } catch (Exception e) {
log.error("日期 {} 事件ID {} 删除存量数据失败: {}", date, eventId, e.getMessage(), e); log.error("日期 {} 事件ID {} 删除存量数据失败: {}", date, eventId, e.getMessage(), e);
throw new RuntimeException("删除日期 " + date + "事件ID " + eventId + " 存量数据失败: " + e.getMessage(), e); throw new RuntimeException("删除日期 " + date + "事件ID " + eventId + " 存量数据失败: " + e.getMessage(), e);
@ -73,139 +59,82 @@ public class DnerDailyPowerOutageEventBatchService {
/** /**
* 同步指定日期的一批区域数据 * 同步指定日期的一批区域数据
* * orgCode 循环只做数据处理和收集循环结束后分批批量插入每批最多 BATCH_SIZE
* @param date 日期
* @param orgCodeList 区域编码列表批次
* @param batchIndex 批次索引
* @param eventId 事件id
*/ */
public void syncDateDataForBatch(String date, List<String> orgCodeList, int batchIndex, Long eventId) { public void syncDateDataForBatch(String date, List<String> orgCodeList, int batchIndex, Long eventId) {
log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size()); log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size());
List<DnerDailyPowerOutageEvent> allData = new ArrayList<>();
List<DnerHourlyPowerOutageEvent> allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date, eventId); List<DnerHourlyPowerOutageEvent> allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date, eventId);
if (CollectionUtils.isEmpty(allHourlyEvents)) { if (CollectionUtils.isEmpty(allHourlyEvents)) {
log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex); log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex);
return; return;
} }
Map<String, List<DnerHourlyPowerOutageEvent>> orgCodeToHourlyEvents = allHourlyEvents.stream().collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode)); Map<String, List<DnerHourlyPowerOutageEvent>> orgCodeToHourlyEvents = allHourlyEvents.stream()
.collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode));
List<DnerDailyPowerOutageEvent> eventBatch = new ArrayList<>();
List<DnerDailyPowerOutageEventSync> syncBatch = new ArrayList<>();
for (String orgCode : orgCodeList) { for (String orgCode : orgCodeList) {
try { try {
List<DnerHourlyPowerOutageEvent> eventList = orgCodeToHourlyEvents.get(orgCode); List<DnerHourlyPowerOutageEvent> eventList = orgCodeToHourlyEvents.get(orgCode);
if (CollectionUtils.isEmpty(eventList)) { if (CollectionUtils.isEmpty(eventList)) {
log.warn("日期 {} 区域 {} 无小时数据", date, orgCode); log.warn("日期 {} 区域 {} 无小时数据", date, orgCode);
//syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, "无小时数据"));
continue; continue;
} }
DnerDailyPowerOutageEvent dailyPowerOutageEvent = convertToDailyEvent(eventList, date, orgCode); DnerDailyPowerOutageEvent dailyEvent = convertToDailyEvent(eventList, date, orgCode);
if (ObjectUtils.isEmpty(dailyPowerOutageEvent)) { if (ObjectUtils.isEmpty(dailyEvent)) {
log.warn("转换日K线数据失败日期{}, 区域编号:{}", date, orgCode); log.warn("转换日K线数据失败日期{}, 区域编号:{}", date, orgCode);
syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, "转换日K线数据失败"));
continue; continue;
} }
allData.add(dailyPowerOutageEvent); eventBatch.add(dailyEvent);
syncBatch.add(buildSyncRecord(eventId, orgCode, date, 1, null));
} catch (Exception e) { } catch (Exception e) {
log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e); log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e);
syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, e.getMessage()));
} }
} }
// 批量保存数据 // 批量插入 event每批 BATCH_SIZE
if (!CollectionUtils.isEmpty(allData)) { int totalInserted = 0;
for (int i = 0; i < eventBatch.size(); i += BATCH_SIZE) {
List<DnerDailyPowerOutageEvent> sub = eventBatch.subList(i, Math.min(i + BATCH_SIZE, eventBatch.size()));
try { try {
// 分段插入每段最多 600 dailyPowerOutageEventMapper.insert(sub);
int totalInserted = 0; totalInserted += sub.size();
for (int i = 0; i < allData.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, allData.size());
List<DnerDailyPowerOutageEvent> subList = allData.subList(i, end);
for (DnerDailyPowerOutageEvent event : subList) {
dailyPowerOutageEventMapper.insert(event);
}
totalInserted += subList.size();
log.debug("日期 {} 批次 {} 已插入 {}/{} 条记录", date, batchIndex, totalInserted, allData.size());
}
log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted);
} catch (Exception e) { } catch (Exception e) {
log.error("日期 {} 批次 {} 保存数据时发生异常:{}", date, batchIndex, e.getMessage(), e); log.error("日期 {} 批次 {} 批量插入event数据失败: {}", date, batchIndex, e.getMessage(), e);
throw new RuntimeException("保存日 K 线数据失败:" + e.getMessage(), e); final String errMsg = e.getMessage();
sub.forEach(ev -> syncBatch.replaceAll(s ->
s.getOrgCode() != null && s.getOrgCode().equals(ev.getOrgCode()) && s.getSyncStatus() != 2
? buildSyncRecord(ev.getEventId(), ev.getOrgCode(), date, 2, errMsg)
: s));
} }
} else {
log.info("日期 {} 批次 {} 无数据需要保存", date, batchIndex);
} }
}
/** // 批量插入 sync 记录每批 BATCH_SIZE
* 保存每日同步记录 for (int i = 0; i < syncBatch.size(); i += BATCH_SIZE) {
* 一个日期的所有数据同步完成后保存一条同步记录 List<DnerDailyPowerOutageEventSync> sub = syncBatch.subList(i, Math.min(i + BATCH_SIZE, syncBatch.size()));
* try {
* @param date 同步日期 syncMapper.insert(sub);
* @param errors 错误信息列表 } catch (Exception e) {
* @param eventId 事件id log.error("日期 {} 批次 {} 批量插入sync记录失败: {}", date, batchIndex, e.getMessage(), e);
*/
public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors, Long eventId) {
try {
DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
record.setDateTime(date);
record.setEventId(eventId);
int syncStatus = getSyncStatus(date, eventId);
// 记录错误信息
String errorMsg = null;
if (!CollectionUtils.isEmpty(errors)) {
errorMsg = mapper.writeValueAsString(errors);
} }
record.setSyncStatus(syncStatus);
record.setErrorMsg(errorMsg);
syncMapper.insert(record);
log.info("日期 {} 同步记录已保存,状态: {}, 错误数: {}",
date, syncStatus, CollectionUtils.isEmpty(errors) ? 0 : errors.size());
} catch (Exception e) {
log.error("日期 {} 保存同步记录失败: {}", date, e.getMessage(), e);
// 不抛出异常避免影响主流程
} }
log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted);
} }
private int getSyncStatus(String date, Long eventId) { private DnerDailyPowerOutageEventSync buildSyncRecord(Long eventId, String orgCode, String date, int syncStatus, String errorMsg) {
// 根据date和eventId 查看分时表数据是否为全部数据(整时存在23时数据) DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
int syncStatus = 1; record.setDateTime(date);
DnerHourlyPowerOutageEvent event = hourlyPowerOutageEventMapper.selectOne(new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>() record.setEventId(eventId);
.eq(DnerHourlyPowerOutageEvent::getEventId, eventId) record.setOrgCode(orgCode);
.likeRight(DnerHourlyPowerOutageEvent::getDataTime, date) record.setSyncStatus(syncStatus);
.orderByDesc(DnerHourlyPowerOutageEvent::getDataTime) record.setErrorMsg(errorMsg);
.last("LIMIT 1")); return record;
int hour = LocalDateTime.parse(
event.getDataTime(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
).getHour();
if (23 != hour) {
syncStatus = 2;
}
return syncStatus;
}
/**
* 根据日期查询小时数据
*/
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDate(String orgCode, String date) {
try {
String start = date + " 00:00:00";
String end = date + " 23:59:59";
return hourlyPowerOutageEventMapper.selectList(
new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
.eq(DnerHourlyPowerOutageEvent::getOrgCode, orgCode)
.ge(DnerHourlyPowerOutageEvent::getDataTime, start)
.le(DnerHourlyPowerOutageEvent::getDataTime, end)
);
} catch (Exception e) {
log.error("查询小时数据失败 - 区域: {}, 日期: {}, 错误: {}", orgCode, date, e.getMessage(), e);
throw new RuntimeException("查询小时数据失败: " + e.getMessage(), e);
}
} }
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDateAndOrgCodes(List<String> orgCodes, String date, Long eventId) { private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDateAndOrgCodes(List<String> orgCodes, String date, Long eventId) {
@ -222,9 +151,6 @@ public class DnerDailyPowerOutageEventBatchService {
} }
} }
/**
* 转换为每日事件
*/
private DnerDailyPowerOutageEvent convertToDailyEvent(List<DnerHourlyPowerOutageEvent> list, String date, String orgCode) { private DnerDailyPowerOutageEvent convertToDailyEvent(List<DnerHourlyPowerOutageEvent> list, String date, String orgCode) {
if (list == null || list.isEmpty()) { if (list == null || list.isEmpty()) {
return null; return null;
@ -235,7 +161,6 @@ public class DnerDailyPowerOutageEventBatchService {
daily.setDataTime(date); daily.setDataTime(date);
daily.setEventId(list.get(0).getEventId()); daily.setEventId(list.get(0).getEventId());
//故障停电影响用户总数
IntSummaryStatistics faultStats = list.stream() IntSummaryStatistics faultStats = list.stream()
.mapToInt(e -> e.getFaultUserCount() == null ? 0 : e.getFaultUserCount()) .mapToInt(e -> e.getFaultUserCount() == null ? 0 : e.getFaultUserCount())
.summaryStatistics(); .summaryStatistics();
@ -247,7 +172,6 @@ public class DnerDailyPowerOutageEventBatchService {
daily.setFaultEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) daily.setFaultEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getFaultUserCount).orElse(0)); .map(DnerHourlyPowerOutageEvent::getFaultUserCount).orElse(0));
//计划停电影响用户数
IntSummaryStatistics scheduledStats = list.stream() IntSummaryStatistics scheduledStats = list.stream()
.mapToInt(e -> e.getScheduledUserCount() == null ? 0 : e.getScheduledUserCount()) .mapToInt(e -> e.getScheduledUserCount() == null ? 0 : e.getScheduledUserCount())
.summaryStatistics(); .summaryStatistics();
@ -259,7 +183,6 @@ public class DnerDailyPowerOutageEventBatchService {
daily.setScheduledEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) daily.setScheduledEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getScheduledUserCount).orElse(0)); .map(DnerHourlyPowerOutageEvent::getScheduledUserCount).orElse(0));
// 停电影响用户总数
IntSummaryStatistics userStats = list.stream() IntSummaryStatistics userStats = list.stream()
.mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount()) .mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount())
.summaryStatistics(); .summaryStatistics();
@ -271,7 +194,6 @@ public class DnerDailyPowerOutageEventBatchService {
daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0)); .map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0));
// 气象数据统计
daily.setDailyPrecipitation(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getDailyPrecipitation())).sum())); daily.setDailyPrecipitation(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getDailyPrecipitation())).sum()));
daily.setTemperature(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getTemperature())).average().orElse(0.0))); daily.setTemperature(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getTemperature())).average().orElse(0.0)));
daily.setHourlyMaxTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMaxTemperature())).max().orElse(0.0))); daily.setHourlyMaxTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMaxTemperature())).max().orElse(0.0)));
@ -302,31 +224,4 @@ public class DnerDailyPowerOutageEventBatchService {
return String.valueOf(val).replaceAll("\\.0$", ""); return String.valueOf(val).replaceAll("\\.0$", "");
} }
/**
* 深拷贝
*/
public <T> T deepCopyByJackson(T source, Class<T> clazz) {
if (source == null) {
return null;
}
try {
String json = mapper.writeValueAsString(source);
return mapper.readValue(json, clazz);
} catch (Exception e) {
log.error("Jackson深拷贝失败: {}", e.getMessage(), e);
throw new RuntimeException("Jackson深拷贝失败", e);
}
}
/**
* 同步错误信息
*/
@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
public static class SyncErrorInfo {
private String dateTime;
private String orgCode;
private String message;
}
} }

View File

@ -147,7 +147,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
/** /**
* 处理单个日期的数据 * 处理单个日期的数据
* 如果是开始日期先删除存量数据然后多线程同步所有区域数据最后保存一条同步记录 * 多线程同步所有区域数据
* *
* @param executor 线程池 * @param executor 线程池
* @param date 日期 * @param date 日期
@ -160,76 +160,64 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size()); log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size());
log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor)); log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor));
List<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> allErrors = new ArrayList<>(); List<Future<Void>> futures = new ArrayList<>(orgCodeBatches.size());
List<Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo>> futures = new ArrayList<>(orgCodeBatches.size());
for (int i = 0; i < orgCodeBatches.size(); i++) { for (int i = 0; i < orgCodeBatches.size(); i++) {
final List<String> batchOrgCodes = orgCodeBatches.get(i); final List<String> batchOrgCodes = orgCodeBatches.get(i);
final int batchIndex = i + 1; final int batchIndex = i + 1;
try { try {
Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> future = executor.submit(() -> { Future<Void> future = executor.submit(() -> {
try { try {
log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size()); log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size());
batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex, eventId); batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex, eventId);
return null;
} catch (Exception e) { } catch (Exception e) {
log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e); log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e);
return new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
date, "批次" + batchIndex, e.getMessage());
} }
return null;
}); });
futures.add(future); futures.add(future);
} catch (Exception e) { } catch (Exception e) {
log.error("日期 {} 批次 {} 提交任务失败: {}", date, batchIndex, e.getMessage(), e); log.error("日期 {} 批次 {} 提交任务失败: {}", date, batchIndex, e.getMessage(), e);
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
date, "批次" + batchIndex, e.getMessage()));
} }
} }
log.info("日期 {} 已提交任务数: {}", date, futures.size()); log.info("日期 {} 已提交任务数: {}", date, futures.size());
long deadlineNanos = System.nanoTime() + TimeUnit.HOURS.toNanos(2); long deadlineNanos = System.nanoTime() + TimeUnit.HOURS.toNanos(2);
boolean timedOut = false; boolean timedOut = false;
for (Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> future : futures) { for (Future<Void> future : futures) {
long remainingNanos = deadlineNanos - System.nanoTime(); long remainingNanos = deadlineNanos - System.nanoTime();
if (remainingNanos <= 0) { if (remainingNanos <= 0) {
timedOut = true; timedOut = true;
break; break;
} }
try { try {
DnerDailyPowerOutageEventBatchService.SyncErrorInfo error = future.get(remainingNanos, TimeUnit.NANOSECONDS);
future.get(remainingNanos, TimeUnit.NANOSECONDS);
if (error != null) {
allErrors.add(error);
}
} catch (TimeoutException e) { } catch (TimeoutException e) {
timedOut = true; timedOut = true;
break; break;
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( log.error("日期 {} 同步被中断", date);
date, null, "同步被中断"));
timedOut = true; timedOut = true;
break; break;
} catch (Exception e) { } catch (Exception e) {
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( log.error("日期 {} 等待任务完成时发生异常: {}", date, e.getMessage(), e);
date, null, e.getMessage()));
} }
} }
if (timedOut) { if (timedOut) {
log.error("日期 {} 同步超时", date); log.error("日期 {} 同步超时,取消未完成任务", date);
for (Future<?> future : futures) { for (Future<?> future : futures) {
if (!future.isDone()) { if (!future.isDone()) {
future.cancel(true); future.cancel(true);
} }
} }
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( return false;
date, null, "同步超时"));
} }
batchService.saveDailySyncRecord(date, allErrors, eventId); log.info("日期 {} 所有批次处理完成", date);
return true;
return allErrors.isEmpty();
} }
/** /**
@ -274,23 +262,6 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
} }
private Long getLatestSyncRecordByDate(String date, Long eventId) {
try {
return this.lambdaQuery()
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
.count();
} catch (Exception e) {
log.warn("查询日期 {} 同步记录失败: {}", date, e.getMessage(), e);
return 0L;
}
}
private boolean isSyncSuccess(DnerDailyPowerOutageEventSync record) {
String msg = record.getErrorMsg();
return msg == null || msg.trim().isEmpty() || "[]".equals(msg.trim());
}
/** /**
* 获取所有区域编码 * 获取所有区域编码
*/ */

View File

@ -4,6 +4,7 @@
<!-- namespace 必须 = Mapper 接口全类名 --> <!-- namespace 必须 = Mapper 接口全类名 -->
<mapper namespace="com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper"> <mapper namespace="com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper">
<select id="selectListByConditions" resultType="com.southern.power.grid.entity.DnerDailyPowerOutageEvent"> <select id="selectListByConditions" resultType="com.southern.power.grid.entity.DnerDailyPowerOutageEvent">
select `id`, `org_code`, `data_time`, `hourly_precipitation`, `daily_precipitation`, `temperature`, select `id`, `org_code`, `data_time`, `hourly_precipitation`, `daily_precipitation`, `temperature`,
`hourly_max_temperature`, `hourly_min_temperature`, `extreme_wind_speed_hourly`, `user_count`, `hourly_max_temperature`, `hourly_min_temperature`, `extreme_wind_speed_hourly`, `user_count`,

View File

@ -236,7 +236,7 @@ create table dner_daily_power_outage_event_sync
event_id BIGINT(20) NOT NULL COMMENT '关联事件ID', event_id BIGINT(20) NOT NULL COMMENT '关联事件ID',
org_code varchar(64) not null comment '地区编码', org_code varchar(64) not null comment '地区编码',
date_time varchar(32) not null comment '日期(yyyy-mm-dd)', date_time varchar(32) not null comment '日期(yyyy-mm-dd)',
sync_status int default 1 not null comment '同步状态 1 全部同步 2部分同步', sync_status int default 1 not null comment '同步状态 1 同步成功 2同步失败',
error_msg text null comment '同步失败原因', error_msg text null comment '同步失败原因',
create_by varchar(64) null comment '创建人', create_by varchar(64) null comment '创建人',
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间', create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',