refactor: 重构日停电事件同步逻辑,优化批量处理性能
- 移除 SyncErrorInfo 类及相关错误收集逻辑,简化错误处理 - 将单条插入改为批量插入,提升数据插入效率 - 重构 syncDateDataForBatch 方法,直接构建同步记录并批量保存 - 修改 syncStatus 字段注释,明确同步成功/失败状态 - 清理无用代码,包括深拷贝方法和过时查询逻辑
This commit is contained in:
parent
7c0db4830d
commit
9f7e491f9c
@ -33,7 +33,7 @@ public class DnerDailyPowerOutageEventSync {
|
|||||||
private Long eventId;
|
private Long eventId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 同步状态 1 非当日调用 2 当日调用
|
* 同步状态 1 同步成功 2 同步失败
|
||||||
*/
|
*/
|
||||||
private Integer syncStatus;
|
private Integer syncStatus;
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
package com.southern.power.grid.service.impl;
|
package com.southern.power.grid.service.impl;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper;
|
import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper;
|
||||||
import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper;
|
import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper;
|
||||||
import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper;
|
import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper;
|
||||||
@ -14,14 +13,11 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
import org.springframework.util.ObjectUtils;
|
import org.springframework.util.ObjectUtils;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.time.format.DateTimeFormatter;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 日K线停电事件批量处理服务
|
* 日K线停电事件批量处理服务
|
||||||
* 专门处理事务性的批量同步操作,解决@Transactional自调用问题
|
|
||||||
*
|
*
|
||||||
* @author: System
|
* @author: System
|
||||||
* @date: 2026/3/17
|
* @date: 2026/3/17
|
||||||
@ -34,37 +30,27 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
private final DnerDailyPowerOutageEventMapper dailyPowerOutageEventMapper;
|
private final DnerDailyPowerOutageEventMapper dailyPowerOutageEventMapper;
|
||||||
private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper;
|
private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper;
|
||||||
private final DnerDailyPowerOutageEventSyncMapper syncMapper;
|
private final DnerDailyPowerOutageEventSyncMapper syncMapper;
|
||||||
private final ObjectMapper mapper = new ObjectMapper();
|
|
||||||
|
|
||||||
private static final int BATCH_SIZE = 600;
|
private static final int BATCH_SIZE = 600;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 删除指定日期的存量数据
|
* 删除指定日期的存量数据
|
||||||
* 删除dner_daily_power_outage_event(对应data_time字段)和dner_daily_power_outage_event_sync(对应date_time字段)中的对应数据
|
|
||||||
*
|
|
||||||
* @param date 日期
|
|
||||||
* @param eventId 事件id
|
|
||||||
*/
|
*/
|
||||||
public void deleteDateData(String date, Long eventId) {
|
public void deleteDateData(String date, Long eventId) {
|
||||||
try {
|
try {
|
||||||
log.info("开始删除日期 {} 事件ID {} 的存量数据", date, eventId);
|
log.info("开始删除日期 {} 事件ID {} 的存量数据", date, eventId);
|
||||||
|
|
||||||
// 1. 删除dner_daily_power_outage_event表中对应日期的数据
|
|
||||||
int deleteDailyCount = dailyPowerOutageEventMapper.delete(
|
int deleteDailyCount = dailyPowerOutageEventMapper.delete(
|
||||||
new LambdaQueryWrapper<DnerDailyPowerOutageEvent>()
|
new LambdaQueryWrapper<DnerDailyPowerOutageEvent>()
|
||||||
.eq(DnerDailyPowerOutageEvent::getEventId, eventId)
|
.eq(DnerDailyPowerOutageEvent::getEventId, eventId)
|
||||||
.eq(DnerDailyPowerOutageEvent::getDataTime, date)
|
.eq(DnerDailyPowerOutageEvent::getDataTime, date)
|
||||||
);
|
);
|
||||||
log.info("日期 {} 事件ID {} 删除日表 {} 条存量数据", date, eventId, deleteDailyCount);
|
log.info("日期 {} 事件ID {} 删除日表 {} 条存量数据", date, eventId, deleteDailyCount);
|
||||||
|
|
||||||
// 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录
|
|
||||||
int deleteSyncCount = syncMapper.delete(
|
int deleteSyncCount = syncMapper.delete(
|
||||||
new LambdaQueryWrapper<DnerDailyPowerOutageEventSync>()
|
new LambdaQueryWrapper<DnerDailyPowerOutageEventSync>()
|
||||||
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
|
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
|
||||||
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
|
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
|
||||||
);
|
);
|
||||||
log.info("日期 {} 事件ID {} 删除同步记录表 {} 条存量数据", date, eventId, deleteSyncCount);
|
log.info("日期 {} 事件ID {} 删除同步记录表 {} 条存量数据", date, eventId, deleteSyncCount);
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("日期 {} 事件ID {} 删除存量数据失败: {}", date, eventId, e.getMessage(), e);
|
log.error("日期 {} 事件ID {} 删除存量数据失败: {}", date, eventId, e.getMessage(), e);
|
||||||
throw new RuntimeException("删除日期 " + date + "事件ID " + eventId + " 存量数据失败: " + e.getMessage(), e);
|
throw new RuntimeException("删除日期 " + date + "事件ID " + eventId + " 存量数据失败: " + e.getMessage(), e);
|
||||||
@ -73,139 +59,82 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 同步指定日期的一批区域数据
|
* 同步指定日期的一批区域数据
|
||||||
*
|
* orgCode 循环只做数据处理和收集,循环结束后分批批量插入,每批最多 BATCH_SIZE 条
|
||||||
* @param date 日期
|
|
||||||
* @param orgCodeList 区域编码列表(批次)
|
|
||||||
* @param batchIndex 批次索引
|
|
||||||
* @param eventId 事件id
|
|
||||||
*/
|
*/
|
||||||
public void syncDateDataForBatch(String date, List<String> orgCodeList, int batchIndex, Long eventId) {
|
public void syncDateDataForBatch(String date, List<String> orgCodeList, int batchIndex, Long eventId) {
|
||||||
log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size());
|
log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size());
|
||||||
|
|
||||||
List<DnerDailyPowerOutageEvent> allData = new ArrayList<>();
|
|
||||||
|
|
||||||
List<DnerHourlyPowerOutageEvent> allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date, eventId);
|
List<DnerHourlyPowerOutageEvent> allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date, eventId);
|
||||||
if (CollectionUtils.isEmpty(allHourlyEvents)) {
|
if (CollectionUtils.isEmpty(allHourlyEvents)) {
|
||||||
log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex);
|
log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Map<String, List<DnerHourlyPowerOutageEvent>> orgCodeToHourlyEvents = allHourlyEvents.stream().collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode));
|
Map<String, List<DnerHourlyPowerOutageEvent>> orgCodeToHourlyEvents = allHourlyEvents.stream()
|
||||||
|
.collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode));
|
||||||
|
|
||||||
|
List<DnerDailyPowerOutageEvent> eventBatch = new ArrayList<>();
|
||||||
|
List<DnerDailyPowerOutageEventSync> syncBatch = new ArrayList<>();
|
||||||
|
|
||||||
for (String orgCode : orgCodeList) {
|
for (String orgCode : orgCodeList) {
|
||||||
try {
|
try {
|
||||||
List<DnerHourlyPowerOutageEvent> eventList = orgCodeToHourlyEvents.get(orgCode);
|
List<DnerHourlyPowerOutageEvent> eventList = orgCodeToHourlyEvents.get(orgCode);
|
||||||
if (CollectionUtils.isEmpty(eventList)) {
|
if (CollectionUtils.isEmpty(eventList)) {
|
||||||
log.warn("日期 {} 区域 {} 无小时数据", date, orgCode);
|
log.warn("日期 {} 区域 {} 无小时数据", date, orgCode);
|
||||||
|
//syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, "无小时数据"));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
DnerDailyPowerOutageEvent dailyPowerOutageEvent = convertToDailyEvent(eventList, date, orgCode);
|
DnerDailyPowerOutageEvent dailyEvent = convertToDailyEvent(eventList, date, orgCode);
|
||||||
if (ObjectUtils.isEmpty(dailyPowerOutageEvent)) {
|
if (ObjectUtils.isEmpty(dailyEvent)) {
|
||||||
log.warn("转换日K线数据失败:日期:{}, 区域编号:{}", date, orgCode);
|
log.warn("转换日K线数据失败:日期:{}, 区域编号:{}", date, orgCode);
|
||||||
|
syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, "转换日K线数据失败"));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
allData.add(dailyPowerOutageEvent);
|
eventBatch.add(dailyEvent);
|
||||||
|
syncBatch.add(buildSyncRecord(eventId, orgCode, date, 1, null));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e);
|
log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e);
|
||||||
|
syncBatch.add(buildSyncRecord(eventId, orgCode, date, 2, e.getMessage()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 批量保存数据
|
// 批量插入 event,每批 BATCH_SIZE 条
|
||||||
if (!CollectionUtils.isEmpty(allData)) {
|
int totalInserted = 0;
|
||||||
|
for (int i = 0; i < eventBatch.size(); i += BATCH_SIZE) {
|
||||||
|
List<DnerDailyPowerOutageEvent> sub = eventBatch.subList(i, Math.min(i + BATCH_SIZE, eventBatch.size()));
|
||||||
try {
|
try {
|
||||||
// 分段插入,每段最多 600 条
|
dailyPowerOutageEventMapper.insert(sub);
|
||||||
int totalInserted = 0;
|
totalInserted += sub.size();
|
||||||
for (int i = 0; i < allData.size(); i += BATCH_SIZE) {
|
|
||||||
int end = Math.min(i + BATCH_SIZE, allData.size());
|
|
||||||
List<DnerDailyPowerOutageEvent> subList = allData.subList(i, end);
|
|
||||||
|
|
||||||
for (DnerDailyPowerOutageEvent event : subList) {
|
|
||||||
dailyPowerOutageEventMapper.insert(event);
|
|
||||||
}
|
|
||||||
totalInserted += subList.size();
|
|
||||||
log.debug("日期 {} 批次 {} 已插入 {}/{} 条记录", date, batchIndex, totalInserted, allData.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("日期 {} 批次 {} 保存数据时发生异常:{}", date, batchIndex, e.getMessage(), e);
|
log.error("日期 {} 批次 {} 批量插入event数据失败: {}", date, batchIndex, e.getMessage(), e);
|
||||||
throw new RuntimeException("保存日 K 线数据失败:" + e.getMessage(), e);
|
final String errMsg = e.getMessage();
|
||||||
|
sub.forEach(ev -> syncBatch.replaceAll(s ->
|
||||||
|
s.getOrgCode() != null && s.getOrgCode().equals(ev.getOrgCode()) && s.getSyncStatus() != 2
|
||||||
|
? buildSyncRecord(ev.getEventId(), ev.getOrgCode(), date, 2, errMsg)
|
||||||
|
: s));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.info("日期 {} 批次 {} 无数据需要保存", date, batchIndex);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
// 批量插入 sync 记录,每批 BATCH_SIZE 条
|
||||||
* 保存每日同步记录
|
for (int i = 0; i < syncBatch.size(); i += BATCH_SIZE) {
|
||||||
* 一个日期的所有数据同步完成后,保存一条同步记录
|
List<DnerDailyPowerOutageEventSync> sub = syncBatch.subList(i, Math.min(i + BATCH_SIZE, syncBatch.size()));
|
||||||
*
|
try {
|
||||||
* @param date 同步日期
|
syncMapper.insert(sub);
|
||||||
* @param errors 错误信息列表
|
} catch (Exception e) {
|
||||||
* @param eventId 事件id
|
log.error("日期 {} 批次 {} 批量插入sync记录失败: {}", date, batchIndex, e.getMessage(), e);
|
||||||
*/
|
|
||||||
public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors, Long eventId) {
|
|
||||||
try {
|
|
||||||
DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
|
|
||||||
record.setDateTime(date);
|
|
||||||
record.setEventId(eventId);
|
|
||||||
int syncStatus = getSyncStatus(date, eventId);
|
|
||||||
|
|
||||||
// 记录错误信息
|
|
||||||
String errorMsg = null;
|
|
||||||
if (!CollectionUtils.isEmpty(errors)) {
|
|
||||||
errorMsg = mapper.writeValueAsString(errors);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
record.setSyncStatus(syncStatus);
|
|
||||||
record.setErrorMsg(errorMsg);
|
|
||||||
|
|
||||||
syncMapper.insert(record);
|
|
||||||
|
|
||||||
log.info("日期 {} 同步记录已保存,状态: {}, 错误数: {}",
|
|
||||||
date, syncStatus, CollectionUtils.isEmpty(errors) ? 0 : errors.size());
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("日期 {} 保存同步记录失败: {}", date, e.getMessage(), e);
|
|
||||||
// 不抛出异常,避免影响主流程
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getSyncStatus(String date, Long eventId) {
|
private DnerDailyPowerOutageEventSync buildSyncRecord(Long eventId, String orgCode, String date, int syncStatus, String errorMsg) {
|
||||||
// 根据date和eventId 查看分时表数据是否为全部数据(整时存在23时数据)
|
DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
|
||||||
int syncStatus = 1;
|
record.setDateTime(date);
|
||||||
DnerHourlyPowerOutageEvent event = hourlyPowerOutageEventMapper.selectOne(new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
|
record.setEventId(eventId);
|
||||||
.eq(DnerHourlyPowerOutageEvent::getEventId, eventId)
|
record.setOrgCode(orgCode);
|
||||||
.likeRight(DnerHourlyPowerOutageEvent::getDataTime, date)
|
record.setSyncStatus(syncStatus);
|
||||||
.orderByDesc(DnerHourlyPowerOutageEvent::getDataTime)
|
record.setErrorMsg(errorMsg);
|
||||||
.last("LIMIT 1"));
|
return record;
|
||||||
int hour = LocalDateTime.parse(
|
|
||||||
event.getDataTime(),
|
|
||||||
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
|
|
||||||
).getHour();
|
|
||||||
if (23 != hour) {
|
|
||||||
syncStatus = 2;
|
|
||||||
}
|
|
||||||
return syncStatus;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 根据日期查询小时数据
|
|
||||||
*/
|
|
||||||
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDate(String orgCode, String date) {
|
|
||||||
try {
|
|
||||||
String start = date + " 00:00:00";
|
|
||||||
String end = date + " 23:59:59";
|
|
||||||
return hourlyPowerOutageEventMapper.selectList(
|
|
||||||
new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
|
|
||||||
.eq(DnerHourlyPowerOutageEvent::getOrgCode, orgCode)
|
|
||||||
.ge(DnerHourlyPowerOutageEvent::getDataTime, start)
|
|
||||||
.le(DnerHourlyPowerOutageEvent::getDataTime, end)
|
|
||||||
);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("查询小时数据失败 - 区域: {}, 日期: {}, 错误: {}", orgCode, date, e.getMessage(), e);
|
|
||||||
throw new RuntimeException("查询小时数据失败: " + e.getMessage(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDateAndOrgCodes(List<String> orgCodes, String date, Long eventId) {
|
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDateAndOrgCodes(List<String> orgCodes, String date, Long eventId) {
|
||||||
@ -222,9 +151,6 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 转换为每日事件
|
|
||||||
*/
|
|
||||||
private DnerDailyPowerOutageEvent convertToDailyEvent(List<DnerHourlyPowerOutageEvent> list, String date, String orgCode) {
|
private DnerDailyPowerOutageEvent convertToDailyEvent(List<DnerHourlyPowerOutageEvent> list, String date, String orgCode) {
|
||||||
if (list == null || list.isEmpty()) {
|
if (list == null || list.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
@ -235,7 +161,6 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
daily.setDataTime(date);
|
daily.setDataTime(date);
|
||||||
daily.setEventId(list.get(0).getEventId());
|
daily.setEventId(list.get(0).getEventId());
|
||||||
|
|
||||||
//故障停电影响用户总数
|
|
||||||
IntSummaryStatistics faultStats = list.stream()
|
IntSummaryStatistics faultStats = list.stream()
|
||||||
.mapToInt(e -> e.getFaultUserCount() == null ? 0 : e.getFaultUserCount())
|
.mapToInt(e -> e.getFaultUserCount() == null ? 0 : e.getFaultUserCount())
|
||||||
.summaryStatistics();
|
.summaryStatistics();
|
||||||
@ -247,7 +172,6 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
daily.setFaultEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
|
daily.setFaultEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
|
||||||
.map(DnerHourlyPowerOutageEvent::getFaultUserCount).orElse(0));
|
.map(DnerHourlyPowerOutageEvent::getFaultUserCount).orElse(0));
|
||||||
|
|
||||||
//计划停电影响用户数
|
|
||||||
IntSummaryStatistics scheduledStats = list.stream()
|
IntSummaryStatistics scheduledStats = list.stream()
|
||||||
.mapToInt(e -> e.getScheduledUserCount() == null ? 0 : e.getScheduledUserCount())
|
.mapToInt(e -> e.getScheduledUserCount() == null ? 0 : e.getScheduledUserCount())
|
||||||
.summaryStatistics();
|
.summaryStatistics();
|
||||||
@ -259,7 +183,6 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
daily.setScheduledEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
|
daily.setScheduledEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
|
||||||
.map(DnerHourlyPowerOutageEvent::getScheduledUserCount).orElse(0));
|
.map(DnerHourlyPowerOutageEvent::getScheduledUserCount).orElse(0));
|
||||||
|
|
||||||
// 停电影响用户总数
|
|
||||||
IntSummaryStatistics userStats = list.stream()
|
IntSummaryStatistics userStats = list.stream()
|
||||||
.mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount())
|
.mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount())
|
||||||
.summaryStatistics();
|
.summaryStatistics();
|
||||||
@ -271,7 +194,6 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
|
daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
|
||||||
.map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0));
|
.map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0));
|
||||||
|
|
||||||
// 气象数据统计
|
|
||||||
daily.setDailyPrecipitation(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getDailyPrecipitation())).sum()));
|
daily.setDailyPrecipitation(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getDailyPrecipitation())).sum()));
|
||||||
daily.setTemperature(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getTemperature())).average().orElse(0.0)));
|
daily.setTemperature(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getTemperature())).average().orElse(0.0)));
|
||||||
daily.setHourlyMaxTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMaxTemperature())).max().orElse(0.0)));
|
daily.setHourlyMaxTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMaxTemperature())).max().orElse(0.0)));
|
||||||
@ -302,31 +224,4 @@ public class DnerDailyPowerOutageEventBatchService {
|
|||||||
return String.valueOf(val).replaceAll("\\.0$", "");
|
return String.valueOf(val).replaceAll("\\.0$", "");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* 深拷贝
|
|
||||||
*/
|
|
||||||
public <T> T deepCopyByJackson(T source, Class<T> clazz) {
|
|
||||||
if (source == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
String json = mapper.writeValueAsString(source);
|
|
||||||
return mapper.readValue(json, clazz);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Jackson深拷贝失败: {}", e.getMessage(), e);
|
|
||||||
throw new RuntimeException("Jackson深拷贝失败", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 同步错误信息
|
|
||||||
*/
|
|
||||||
@lombok.Data
|
|
||||||
@lombok.AllArgsConstructor
|
|
||||||
@lombok.NoArgsConstructor
|
|
||||||
public static class SyncErrorInfo {
|
|
||||||
private String dateTime;
|
|
||||||
private String orgCode;
|
|
||||||
private String message;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -147,7 +147,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理单个日期的数据
|
* 处理单个日期的数据
|
||||||
* 如果是开始日期,先删除存量数据,然后多线程同步所有区域数据,最后保存一条同步记录
|
* 多线程同步所有区域数据
|
||||||
*
|
*
|
||||||
* @param executor 线程池
|
* @param executor 线程池
|
||||||
* @param date 日期
|
* @param date 日期
|
||||||
@ -160,76 +160,64 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
|
|||||||
log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size());
|
log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size());
|
||||||
log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor));
|
log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor));
|
||||||
|
|
||||||
List<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> allErrors = new ArrayList<>();
|
List<Future<Void>> futures = new ArrayList<>(orgCodeBatches.size());
|
||||||
List<Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo>> futures = new ArrayList<>(orgCodeBatches.size());
|
|
||||||
|
|
||||||
for (int i = 0; i < orgCodeBatches.size(); i++) {
|
for (int i = 0; i < orgCodeBatches.size(); i++) {
|
||||||
final List<String> batchOrgCodes = orgCodeBatches.get(i);
|
final List<String> batchOrgCodes = orgCodeBatches.get(i);
|
||||||
final int batchIndex = i + 1;
|
final int batchIndex = i + 1;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> future = executor.submit(() -> {
|
Future<Void> future = executor.submit(() -> {
|
||||||
try {
|
try {
|
||||||
log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size());
|
log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size());
|
||||||
batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex, eventId);
|
batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex, eventId);
|
||||||
return null;
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e);
|
log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e);
|
||||||
return new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
|
|
||||||
date, "批次" + batchIndex, e.getMessage());
|
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
});
|
});
|
||||||
futures.add(future);
|
futures.add(future);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("日期 {} 批次 {} 提交任务失败: {}", date, batchIndex, e.getMessage(), e);
|
log.error("日期 {} 批次 {} 提交任务失败: {}", date, batchIndex, e.getMessage(), e);
|
||||||
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
|
|
||||||
date, "批次" + batchIndex, e.getMessage()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("日期 {} 已提交任务数: {}", date, futures.size());
|
log.info("日期 {} 已提交任务数: {}", date, futures.size());
|
||||||
|
|
||||||
long deadlineNanos = System.nanoTime() + TimeUnit.HOURS.toNanos(2);
|
long deadlineNanos = System.nanoTime() + TimeUnit.HOURS.toNanos(2);
|
||||||
boolean timedOut = false;
|
boolean timedOut = false;
|
||||||
for (Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> future : futures) {
|
for (Future<Void> future : futures) {
|
||||||
long remainingNanos = deadlineNanos - System.nanoTime();
|
long remainingNanos = deadlineNanos - System.nanoTime();
|
||||||
if (remainingNanos <= 0) {
|
if (remainingNanos <= 0) {
|
||||||
timedOut = true;
|
timedOut = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
DnerDailyPowerOutageEventBatchService.SyncErrorInfo error =
|
future.get(remainingNanos, TimeUnit.NANOSECONDS);
|
||||||
future.get(remainingNanos, TimeUnit.NANOSECONDS);
|
|
||||||
if (error != null) {
|
|
||||||
allErrors.add(error);
|
|
||||||
}
|
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
timedOut = true;
|
timedOut = true;
|
||||||
break;
|
break;
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
|
log.error("日期 {} 同步被中断", date);
|
||||||
date, null, "同步被中断"));
|
|
||||||
timedOut = true;
|
timedOut = true;
|
||||||
break;
|
break;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
|
log.error("日期 {} 等待任务完成时发生异常: {}", date, e.getMessage(), e);
|
||||||
date, null, e.getMessage()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timedOut) {
|
if (timedOut) {
|
||||||
log.error("日期 {} 同步超时", date);
|
log.error("日期 {} 同步超时,取消未完成任务", date);
|
||||||
for (Future<?> future : futures) {
|
for (Future<?> future : futures) {
|
||||||
if (!future.isDone()) {
|
if (!future.isDone()) {
|
||||||
future.cancel(true);
|
future.cancel(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
|
return false;
|
||||||
date, null, "同步超时"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
batchService.saveDailySyncRecord(date, allErrors, eventId);
|
log.info("日期 {} 所有批次处理完成", date);
|
||||||
|
return true;
|
||||||
return allErrors.isEmpty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -274,23 +262,6 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Long getLatestSyncRecordByDate(String date, Long eventId) {
|
|
||||||
try {
|
|
||||||
return this.lambdaQuery()
|
|
||||||
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
|
|
||||||
.eq(DnerDailyPowerOutageEventSync::getEventId, eventId)
|
|
||||||
.count();
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("查询日期 {} 同步记录失败: {}", date, e.getMessage(), e);
|
|
||||||
return 0L;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isSyncSuccess(DnerDailyPowerOutageEventSync record) {
|
|
||||||
String msg = record.getErrorMsg();
|
|
||||||
return msg == null || msg.trim().isEmpty() || "[]".equals(msg.trim());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取所有区域编码
|
* 获取所有区域编码
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -4,6 +4,7 @@
|
|||||||
|
|
||||||
<!-- namespace 必须 = Mapper 接口全类名 -->
|
<!-- namespace 必须 = Mapper 接口全类名 -->
|
||||||
<mapper namespace="com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper">
|
<mapper namespace="com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper">
|
||||||
|
|
||||||
<select id="selectListByConditions" resultType="com.southern.power.grid.entity.DnerDailyPowerOutageEvent">
|
<select id="selectListByConditions" resultType="com.southern.power.grid.entity.DnerDailyPowerOutageEvent">
|
||||||
select `id`, `org_code`, `data_time`, `hourly_precipitation`, `daily_precipitation`, `temperature`,
|
select `id`, `org_code`, `data_time`, `hourly_precipitation`, `daily_precipitation`, `temperature`,
|
||||||
`hourly_max_temperature`, `hourly_min_temperature`, `extreme_wind_speed_hourly`, `user_count`,
|
`hourly_max_temperature`, `hourly_min_temperature`, `extreme_wind_speed_hourly`, `user_count`,
|
||||||
|
|||||||
@ -5,4 +5,5 @@
|
|||||||
<!-- namespace 必须 = Mapper 接口全类名 -->
|
<!-- namespace 必须 = Mapper 接口全类名 -->
|
||||||
<mapper namespace="com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper">
|
<mapper namespace="com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper">
|
||||||
|
|
||||||
|
|
||||||
</mapper>
|
</mapper>
|
||||||
@ -236,7 +236,7 @@ create table dner_daily_power_outage_event_sync
|
|||||||
event_id BIGINT(20) NOT NULL COMMENT '关联事件ID',
|
event_id BIGINT(20) NOT NULL COMMENT '关联事件ID',
|
||||||
org_code varchar(64) not null comment '地区编码',
|
org_code varchar(64) not null comment '地区编码',
|
||||||
date_time varchar(32) not null comment '日期(yyyy-mm-dd)',
|
date_time varchar(32) not null comment '日期(yyyy-mm-dd)',
|
||||||
sync_status int default 1 not null comment '同步状态 1 全部同步 2部分同步',
|
sync_status int default 1 not null comment '同步状态 1 同步成功 2同步失败',
|
||||||
error_msg text null comment '同步失败原因',
|
error_msg text null comment '同步失败原因',
|
||||||
create_by varchar(64) null comment '创建人',
|
create_by varchar(64) null comment '创建人',
|
||||||
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
|
create_time datetime default CURRENT_TIMESTAMP not null comment '创建时间',
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user