From 9f7e491f9ce5116798080cd8c6381ff39daa808b Mon Sep 17 00:00:00 2001 From: yufengshuo Date: Fri, 27 Mar 2026 16:14:43 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=E6=97=A5?= =?UTF-8?q?=E5=81=9C=E7=94=B5=E4=BA=8B=E4=BB=B6=E5=90=8C=E6=AD=A5=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E4=BC=98=E5=8C=96=E6=89=B9=E9=87=8F=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 移除 SyncErrorInfo 类及相关错误收集逻辑,简化错误处理 - 将单条插入改为批量插入,提升数据插入效率 - 重构 syncDateDataForBatch 方法,直接构建同步记录并批量保存 - 修改 syncStatus 字段注释,明确同步成功/失败状态 - 清理无用代码,包括深拷贝方法和过时查询逻辑 --- .../entity/DnerDailyPowerOutageEventSync.java | 2 +- ...DnerDailyPowerOutageEventBatchService.java | 189 ++++-------------- ...rDailyPowerOutageEventSyncServiceImpl.java | 55 ++--- .../DnerDailyPowerOutageEventMapper.xml | 1 + .../DnerDailyPowerOutageEventSyncMapper.xml | 1 + src/main/resources/sql/20260313-001.sql | 2 +- 6 files changed, 59 insertions(+), 191 deletions(-) diff --git a/src/main/java/com/southern/power/grid/entity/DnerDailyPowerOutageEventSync.java b/src/main/java/com/southern/power/grid/entity/DnerDailyPowerOutageEventSync.java index 1d51006..bac1986 100644 --- a/src/main/java/com/southern/power/grid/entity/DnerDailyPowerOutageEventSync.java +++ b/src/main/java/com/southern/power/grid/entity/DnerDailyPowerOutageEventSync.java @@ -33,7 +33,7 @@ public class DnerDailyPowerOutageEventSync { private Long eventId; /** - * 同步状态 1 非当日调用 2 当日调用 + * 同步状态 1 同步成功 2 同步失败 */ private Integer syncStatus; diff --git a/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventBatchService.java b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventBatchService.java index 20c200c..0d72562 100644 --- a/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventBatchService.java +++ b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventBatchService.java @@ -1,7 +1,6 @@ package com.southern.power.grid.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.fasterxml.jackson.databind.ObjectMapper; import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper; import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper; import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper; @@ -14,14 +13,11 @@ import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; /** * 日K线停电事件批量处理服务 - * 专门处理事务性的批量同步操作,解决@Transactional自调用问题 * * @author: System * @date: 2026/3/17 @@ -34,37 +30,27 @@ public class DnerDailyPowerOutageEventBatchService { private final DnerDailyPowerOutageEventMapper dailyPowerOutageEventMapper; private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper; private final DnerDailyPowerOutageEventSyncMapper syncMapper; - private final ObjectMapper mapper = new ObjectMapper(); private static final int BATCH_SIZE = 600; /** * 删除指定日期的存量数据 - * 删除dner_daily_power_outage_event(对应data_time字段)和dner_daily_power_outage_event_sync(对应date_time字段)中的对应数据 - * - * @param date 日期 - * @param eventId 事件id */ public void deleteDateData(String date, Long eventId) { try { log.info("开始删除日期 {} 事件ID {} 的存量数据", date, eventId); - - // 1. 删除dner_daily_power_outage_event表中对应日期的数据 int deleteDailyCount = dailyPowerOutageEventMapper.delete( new LambdaQueryWrapper() .eq(DnerDailyPowerOutageEvent::getEventId, eventId) .eq(DnerDailyPowerOutageEvent::getDataTime, date) ); log.info("日期 {} 事件ID {} 删除日表 {} 条存量数据", date, eventId, deleteDailyCount); - - // 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录 int deleteSyncCount = syncMapper.delete( new LambdaQueryWrapper() .eq(DnerDailyPowerOutageEventSync::getEventId, eventId) .eq(DnerDailyPowerOutageEventSync::getDateTime, date) ); log.info("日期 {} 事件ID {} 删除同步记录表 {} 条存量数据", date, eventId, deleteSyncCount); - } catch (Exception e) { log.error("日期 {} 事件ID {} 删除存量数据失败: {}", date, eventId, e.getMessage(), e); throw new RuntimeException("删除日期 " + date + "事件ID " + eventId + " 存量数据失败: " + e.getMessage(), e); @@ -73,139 +59,82 @@ public class DnerDailyPowerOutageEventBatchService { /** * 同步指定日期的一批区域数据 - * - * @param date 日期 - * @param orgCodeList 区域编码列表(批次) - * @param batchIndex 批次索引 - * @param eventId 事件id + * orgCode 循环只做数据处理和收集,循环结束后分批批量插入,每批最多 BATCH_SIZE 条 */ public void syncDateDataForBatch(String date, List orgCodeList, int batchIndex, Long eventId) { log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size()); - List allData = new ArrayList<>(); - List allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date, eventId); if (CollectionUtils.isEmpty(allHourlyEvents)) { log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex); return; } - Map> orgCodeToHourlyEvents = allHourlyEvents.stream().collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode)); + Map> orgCodeToHourlyEvents = allHourlyEvents.stream() + .collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode)); + + List eventBatch = new ArrayList<>(); + List syncBatch = new ArrayList<>(); for (String orgCode : orgCodeList) { try { List eventList = orgCodeToHourlyEvents.get(orgCode); if (CollectionUtils.isEmpty(eventList)) { log.warn("日期 {} 区域 {} 无小时数据", date, orgCode); + //syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, "无小时数据")); continue; } - DnerDailyPowerOutageEvent dailyPowerOutageEvent = convertToDailyEvent(eventList, date, orgCode); - if (ObjectUtils.isEmpty(dailyPowerOutageEvent)) { + DnerDailyPowerOutageEvent dailyEvent = convertToDailyEvent(eventList, date, orgCode); + if (ObjectUtils.isEmpty(dailyEvent)) { log.warn("转换日K线数据失败:日期:{}, 区域编号:{}", date, orgCode); + syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, "转换日K线数据失败")); continue; } - allData.add(dailyPowerOutageEvent); - + eventBatch.add(dailyEvent); + syncBatch.add(buildSyncRecord(eventId, orgCode, date, 1, null)); } catch (Exception e) { log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e); + syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, e.getMessage())); } } - // 批量保存数据 - if (!CollectionUtils.isEmpty(allData)) { + // 批量插入 event,每批 BATCH_SIZE 条 + int totalInserted = 0; + for (int i = 0; i < eventBatch.size(); i += BATCH_SIZE) { + List sub = eventBatch.subList(i, Math.min(i + BATCH_SIZE, eventBatch.size())); try { - // 分段插入,每段最多 600 条 - int totalInserted = 0; - for (int i = 0; i < allData.size(); i += BATCH_SIZE) { - int end = Math.min(i + BATCH_SIZE, allData.size()); - List subList = allData.subList(i, end); - - for (DnerDailyPowerOutageEvent event : subList) { - dailyPowerOutageEventMapper.insert(event); - } - totalInserted += subList.size(); - log.debug("日期 {} 批次 {} 已插入 {}/{} 条记录", date, batchIndex, totalInserted, allData.size()); - } - - log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted); + dailyPowerOutageEventMapper.insert(sub); + totalInserted += sub.size(); } catch (Exception e) { - log.error("日期 {} 批次 {} 保存数据时发生异常:{}", date, batchIndex, e.getMessage(), e); - throw new RuntimeException("保存日 K 线数据失败:" + e.getMessage(), e); + log.error("日期 {} 批次 {} 批量插入event数据失败: {}", date, batchIndex, e.getMessage(), e); + final String errMsg = e.getMessage(); + sub.forEach(ev -> syncBatch.replaceAll(s -> + s.getOrgCode() != null && s.getOrgCode().equals(ev.getOrgCode()) && s.getSyncStatus() != 2 + ? buildSyncRecord(ev.getEventId(), ev.getOrgCode(), date, 2, errMsg) + : s)); } - } else { - log.info("日期 {} 批次 {} 无数据需要保存", date, batchIndex); } - } - /** - * 保存每日同步记录 - * 一个日期的所有数据同步完成后,保存一条同步记录 - * - * @param date 同步日期 - * @param errors 错误信息列表 - * @param eventId 事件id - */ - public void saveDailySyncRecord(String date, List errors, Long eventId) { - try { - DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync(); - record.setDateTime(date); - record.setEventId(eventId); - int syncStatus = getSyncStatus(date, eventId); - - // 记录错误信息 - String errorMsg = null; - if (!CollectionUtils.isEmpty(errors)) { - errorMsg = mapper.writeValueAsString(errors); + // 批量插入 sync 记录,每批 BATCH_SIZE 条 + for (int i = 0; i < syncBatch.size(); i += BATCH_SIZE) { + List sub = syncBatch.subList(i, Math.min(i + BATCH_SIZE, syncBatch.size())); + try { + syncMapper.insert(sub); + } catch (Exception e) { + log.error("日期 {} 批次 {} 批量插入sync记录失败: {}", date, batchIndex, e.getMessage(), e); } - - record.setSyncStatus(syncStatus); - record.setErrorMsg(errorMsg); - - syncMapper.insert(record); - - log.info("日期 {} 同步记录已保存,状态: {}, 错误数: {}", - date, syncStatus, CollectionUtils.isEmpty(errors) ? 0 : errors.size()); - - } catch (Exception e) { - log.error("日期 {} 保存同步记录失败: {}", date, e.getMessage(), e); - // 不抛出异常,避免影响主流程 } + + log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted); } - private int getSyncStatus(String date, Long eventId) { - // 根据date和eventId 查看分时表数据是否为全部数据(整时存在23时数据) - int syncStatus = 1; - DnerHourlyPowerOutageEvent event = hourlyPowerOutageEventMapper.selectOne(new LambdaQueryWrapper() - .eq(DnerHourlyPowerOutageEvent::getEventId, eventId) - .likeRight(DnerHourlyPowerOutageEvent::getDataTime, date) - .orderByDesc(DnerHourlyPowerOutageEvent::getDataTime) - .last("LIMIT 1")); - int hour = LocalDateTime.parse( - event.getDataTime(), - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") - ).getHour(); - if (23 != hour) { - syncStatus = 2; - } - return syncStatus; - } - - /** - * 根据日期查询小时数据 - */ - private List queryHourlyDataByDate(String orgCode, String date) { - try { - String start = date + " 00:00:00"; - String end = date + " 23:59:59"; - return hourlyPowerOutageEventMapper.selectList( - new LambdaQueryWrapper() - .eq(DnerHourlyPowerOutageEvent::getOrgCode, orgCode) - .ge(DnerHourlyPowerOutageEvent::getDataTime, start) - .le(DnerHourlyPowerOutageEvent::getDataTime, end) - ); - } catch (Exception e) { - log.error("查询小时数据失败 - 区域: {}, 日期: {}, 错误: {}", orgCode, date, e.getMessage(), e); - throw new RuntimeException("查询小时数据失败: " + e.getMessage(), e); - } + private DnerDailyPowerOutageEventSync buildSyncRecord(Long eventId, String orgCode, String date, int syncStatus, String errorMsg) { + DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync(); + record.setDateTime(date); + record.setEventId(eventId); + record.setOrgCode(orgCode); + record.setSyncStatus(syncStatus); + record.setErrorMsg(errorMsg); + return record; } private List queryHourlyDataByDateAndOrgCodes(List orgCodes, String date, Long eventId) { @@ -222,9 +151,6 @@ public class DnerDailyPowerOutageEventBatchService { } } - /** - * 转换为每日事件 - */ private DnerDailyPowerOutageEvent convertToDailyEvent(List list, String date, String orgCode) { if (list == null || list.isEmpty()) { return null; @@ -235,7 +161,6 @@ public class DnerDailyPowerOutageEventBatchService { daily.setDataTime(date); daily.setEventId(list.get(0).getEventId()); - //故障停电影响用户总数 IntSummaryStatistics faultStats = list.stream() .mapToInt(e -> e.getFaultUserCount() == null ? 0 : e.getFaultUserCount()) .summaryStatistics(); @@ -247,7 +172,6 @@ public class DnerDailyPowerOutageEventBatchService { daily.setFaultEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) .map(DnerHourlyPowerOutageEvent::getFaultUserCount).orElse(0)); - //计划停电影响用户数 IntSummaryStatistics scheduledStats = list.stream() .mapToInt(e -> e.getScheduledUserCount() == null ? 0 : e.getScheduledUserCount()) .summaryStatistics(); @@ -259,7 +183,6 @@ public class DnerDailyPowerOutageEventBatchService { daily.setScheduledEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) .map(DnerHourlyPowerOutageEvent::getScheduledUserCount).orElse(0)); - // 停电影响用户总数 IntSummaryStatistics userStats = list.stream() .mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount()) .summaryStatistics(); @@ -271,7 +194,6 @@ public class DnerDailyPowerOutageEventBatchService { daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) .map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0)); - // 气象数据统计 daily.setDailyPrecipitation(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getDailyPrecipitation())).sum())); daily.setTemperature(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getTemperature())).average().orElse(0.0))); daily.setHourlyMaxTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMaxTemperature())).max().orElse(0.0))); @@ -302,31 +224,4 @@ public class DnerDailyPowerOutageEventBatchService { return String.valueOf(val).replaceAll("\\.0$", ""); } - /** - * 深拷贝 - */ - public T deepCopyByJackson(T source, Class clazz) { - if (source == null) { - return null; - } - try { - String json = mapper.writeValueAsString(source); - return mapper.readValue(json, clazz); - } catch (Exception e) { - log.error("Jackson深拷贝失败: {}", e.getMessage(), e); - throw new RuntimeException("Jackson深拷贝失败", e); - } - } - - /** - * 同步错误信息 - */ - @lombok.Data - @lombok.AllArgsConstructor - @lombok.NoArgsConstructor - public static class SyncErrorInfo { - private String dateTime; - private String orgCode; - private String message; - } } diff --git a/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventSyncServiceImpl.java b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventSyncServiceImpl.java index 6374fbd..32e7a2b 100644 --- a/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventSyncServiceImpl.java +++ b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventSyncServiceImpl.java @@ -147,7 +147,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl allErrors = new ArrayList<>(); - List> futures = new ArrayList<>(orgCodeBatches.size()); + List> futures = new ArrayList<>(orgCodeBatches.size()); for (int i = 0; i < orgCodeBatches.size(); i++) { final List batchOrgCodes = orgCodeBatches.get(i); final int batchIndex = i + 1; try { - Future future = executor.submit(() -> { + Future future = executor.submit(() -> { try { log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size()); batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex, eventId); - return null; } catch (Exception e) { log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e); - return new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( - date, "批次" + batchIndex, e.getMessage()); } + return null; }); futures.add(future); } catch (Exception e) { log.error("日期 {} 批次 {} 提交任务失败: {}", date, batchIndex, e.getMessage(), e); - allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( - date, "批次" + batchIndex, e.getMessage())); } } log.info("日期 {} 已提交任务数: {}", date, futures.size()); long deadlineNanos = System.nanoTime() + TimeUnit.HOURS.toNanos(2); boolean timedOut = false; - for (Future future : futures) { + for (Future future : futures) { long remainingNanos = deadlineNanos - System.nanoTime(); if (remainingNanos <= 0) { timedOut = true; break; } try { - DnerDailyPowerOutageEventBatchService.SyncErrorInfo error = - future.get(remainingNanos, TimeUnit.NANOSECONDS); - if (error != null) { - allErrors.add(error); - } + future.get(remainingNanos, TimeUnit.NANOSECONDS); } catch (TimeoutException e) { timedOut = true; break; } catch (InterruptedException e) { Thread.currentThread().interrupt(); - allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( - date, null, "同步被中断")); + log.error("日期 {} 同步被中断", date); timedOut = true; break; } catch (Exception e) { - allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( - date, null, e.getMessage())); + log.error("日期 {} 等待任务完成时发生异常: {}", date, e.getMessage(), e); } } + if (timedOut) { - log.error("日期 {} 同步超时", date); + log.error("日期 {} 同步超时,取消未完成任务", date); for (Future future : futures) { if (!future.isDone()) { future.cancel(true); } } - allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( - date, null, "同步超时")); + return false; } - batchService.saveDailySyncRecord(date, allErrors, eventId); - - return allErrors.isEmpty(); + log.info("日期 {} 所有批次处理完成", date); + return true; } /** @@ -274,23 +262,6 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl +