提交接口-K值同步代码,调整

This commit is contained in:
yufengshuo 2026-03-19 17:59:22 +08:00
parent 9533fd3aee
commit bc332c4fcf
6 changed files with 93 additions and 76 deletions

View File

@ -33,6 +33,9 @@ public class DnerController {
@Autowired @Autowired
private IDnerSiteAreaConfigurationService dnerSiteAreaConfigurationService; private IDnerSiteAreaConfigurationService dnerSiteAreaConfigurationService;
@Autowired
private IDnerDailyPowerOutageEventSyncService dnerDailyPowerOutageEventSyncService;
@Autowired @Autowired
private IRegionalWeatherDataService regionalWeatherDataService; private IRegionalWeatherDataService regionalWeatherDataService;
@ -103,4 +106,17 @@ public class DnerController {
regionalWeatherDataService.syncOldWeatherData(startDate, endDate); regionalWeatherDataService.syncOldWeatherData(startDate, endDate);
return Result.success("success"); return Result.success("success");
} }
/**
* 同步每日停电事件
*
* @param startDate 开始日期(数据示例2025-09-01)
* @param endDate 结束日期(数据示例2025-09-01)
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置(数据示例2025-09-01)
* @return 结果 <布尔>
*/
@GetMapping("/sync/dailyPowerOutageEvent")
public Result<Boolean> syncDailyPowerOutageEvent(@RequestParam String startDate, @RequestParam String endDate, @RequestParam String setDate) {
return Result.success(dnerDailyPowerOutageEventSyncService.processingData(startDate, endDate, setDate));
}
} }

View File

@ -22,6 +22,7 @@ public interface DnerHourlyPowerOutageEventMapper extends BaseMapper<DnerHourlyP
/** /**
* 批量插入事件记录 * 批量插入事件记录
*
* @param list 待插入的数据列表 * @param list 待插入的数据列表
* @return 插入成功的条数 * @return 插入成功的条数
*/ */
@ -30,4 +31,8 @@ public interface DnerHourlyPowerOutageEventMapper extends BaseMapper<DnerHourlyP
List<DnerHourlyPowerOutageEvent> selectByDistrictsAndTime(@Param("districtCodes") List<String> districtCodes, List<DnerHourlyPowerOutageEvent> selectByDistrictsAndTime(@Param("districtCodes") List<String> districtCodes,
@Param("startTime") LocalDateTime startTime, @Param("startTime") LocalDateTime startTime,
@Param("endTime") LocalDateTime endTime); @Param("endTime") LocalDateTime endTime);
List<DnerHourlyPowerOutageEvent> selectByOrgCodesAndDataTime(@Param("orgCodes") List<String> orgCodes,
@Param("startDate") String startDate,
@Param("endDate") String endDate);
} }

View File

@ -8,4 +8,14 @@ public interface IDnerDailyPowerOutageEventSyncService {
* @return 布尔型 * @return 布尔型
*/ */
Boolean syncDnerDailyPowerOutageEvent(); Boolean syncDnerDailyPowerOutageEvent();
/**
* 处理数据
*
* @param startDate 开始日期
* @param endDate 结束日期
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @return 布尔型
*/
Boolean processingData(String startDate, String endDate, String setDate);
} }

View File

@ -14,8 +14,6 @@ import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -51,7 +49,7 @@ public class DnerDailyPowerOutageEventBatchService {
// 1. 删除dner_daily_power_outage_event表中对应日期的数据 // 1. 删除dner_daily_power_outage_event表中对应日期的数据
int deleteDailyCount = dailyPowerOutageEventMapper.delete( int deleteDailyCount = dailyPowerOutageEventMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEvent>() new LambdaQueryWrapper<DnerDailyPowerOutageEvent>()
.apply("DATE(data_time) = {0}", date) .eq(DnerDailyPowerOutageEvent::getDataTime,date)
); );
log.info("日期 {} 删除日表 {} 条存量数据", date, deleteDailyCount); log.info("日期 {} 删除日表 {} 条存量数据", date, deleteDailyCount);
@ -148,16 +146,15 @@ public class DnerDailyPowerOutageEventBatchService {
* *
* @param date 同步日期 * @param date 同步日期
* @param errors 错误信息列表 * @param errors 错误信息列表
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
*/ */
public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors) { public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors,String setDate) {
try { try {
DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync(); DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
record.setDateTime(date); record.setDateTime(date);
// 判断同步状态 // 判断同步状态
// 如果是当前日期状态为2否则为1 int syncStatus = setDate != null && setDate.equals(date) ? 2 : 1;
String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
int syncStatus = today.equals(date) ? 2 : 1;
// 记录错误信息 // 记录错误信息
String errorMsg = null; String errorMsg = null;
@ -205,12 +202,7 @@ public class DnerDailyPowerOutageEventBatchService {
try { try {
String start = date + " 00:00:00"; String start = date + " 00:00:00";
String end = date + " 23:59:59"; String end = date + " 23:59:59";
return hourlyPowerOutageEventMapper.selectList( return hourlyPowerOutageEventMapper.selectByOrgCodesAndDataTime(orgCodes,start,end);
new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
.in(DnerHourlyPowerOutageEvent::getOrgCode, orgCodes)
.ge(DnerHourlyPowerOutageEvent::getDataTime, start)
.le(DnerHourlyPowerOutageEvent::getDataTime, end)
);
} catch (Exception e) { } catch (Exception e) {
log.error("批量查询小时数据失败 - 日期: {}, 区域数: {}, 错误: {}", date, orgCodes.size(), e.getMessage(), e); log.error("批量查询小时数据失败 - 日期: {}, 区域数: {}, 错误: {}", date, orgCodes.size(), e.getMessage(), e);
throw new RuntimeException("批量查询小时数据失败: " + e.getMessage(), e); throw new RuntimeException("批量查询小时数据失败: " + e.getMessage(), e);

View File

@ -14,6 +14,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -27,7 +28,7 @@ import java.util.stream.Collectors;
* 日K线停电事件同步服务实现类 * 日K线停电事件同步服务实现类
* 功能从dner_hourly_power_outage_event表查询数据经过处理后同步到dner_daily_power_outage_event表 * 功能从dner_hourly_power_outage_event表查询数据经过处理后同步到dner_daily_power_outage_event表
* 特性多线程处理事务支持通过独立事务类错误回滚完善日志 * 特性多线程处理事务支持通过独立事务类错误回滚完善日志
* * <p>
* {@code @author:} System * {@code @author:} System
* {@code @date:} 2026/3/17 * {@code @date:} 2026/3/17
*/ */
@ -41,9 +42,18 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
private final BusinessThreadPoolFactory businessThreadPoolFactory; private final BusinessThreadPoolFactory businessThreadPoolFactory;
private final DnerDailyPowerOutageEventBatchService batchService; private final DnerDailyPowerOutageEventBatchService batchService;
/** 每批次处理的区域数量 */ /**
* 每批次处理的区域数量
*/
private static final int ORG_BATCH_SIZE = 100; private static final int ORG_BATCH_SIZE = 100;
@Override
public Boolean syncDnerDailyPowerOutageEvent() {
String startDate = getStartDate();
String endDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
return processingData(startDate, endDate, endDate);
}
/** /**
* 主同步方法 * 主同步方法
* 按日期顺序同步数据每个日期同步完成后保存一条同步记录 * 按日期顺序同步数据每个日期同步完成后保存一条同步记录
@ -51,14 +61,11 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
* @return 同步结果 * @return 同步结果
*/ */
@Override @Override
public Boolean syncDnerDailyPowerOutageEvent() { public Boolean processingData(String startDate, String endDate, String setDate) {
log.info("================== 开始日K线停电事件同步任务 =================="); log.info("================== 开始日K线停电事件同步任务 ==================");
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
String startDate = getStartDate();
String endDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
log.info("开始日期: {}, 结束日期: {}", startDate, endDate); log.info("开始日期: {}, 结束日期: {}", startDate, endDate);
List<String> allDates = getAllDates(startDate, endDate); List<String> allDates = getAllDates(startDate, endDate);
@ -80,29 +87,27 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
); );
boolean allSuccess = true; boolean allSuccess = true;
String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
for (String date : allDates) { for (String date : allDates) {
try { try {
log.info("开始处理日期:{}", date); log.info("开始处理日期:{}", date);
// 检查是否已存在成功同步记录 // 先删除startDate的旧数据
DnerDailyPowerOutageEventSync existing = getLatestSyncRecordByDate(date); if (date.equals(startDate)) {
boolean isToday = date.equals(today);
// 非今日且已同步成功则跳过
if (existing != null && isSyncSuccess(existing) && !isToday) {
log.info("日期 {} 已存在成功同步记录,跳过处理", date);
continue;
}
// 今日数据或之前失败的记录先删除旧数据
if (existing != null || isToday) {
log.info("日期 {} 存在旧数据,先删除", date); log.info("日期 {} 存在旧数据,先删除", date);
batchService.deleteDateData(date); batchService.deleteDateData(date);
} }
// 检查是否已存在成功同步记录
Long recordByDate = getLatestSyncRecordByDate(date);
// 非今日且已同步成功则跳过
if (recordByDate != 0) {
log.info("日期 {} 已存在成功同步记录,跳过处理", date);
continue;
}
// 处理该日期的数据 // 处理该日期的数据
boolean success = processDateData(executor, date, orgCodeList); boolean success = processDateData(executor, date, orgCodeList, setDate);
if (success) { if (success) {
log.info("日期 {} 处理完成", date); log.info("日期 {} 处理完成", date);
@ -133,12 +138,13 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
* 处理单个日期的数据 * 处理单个日期的数据
* 如果是开始日期先删除存量数据然后多线程同步所有区域数据最后保存一条同步记录 * 如果是开始日期先删除存量数据然后多线程同步所有区域数据最后保存一条同步记录
* *
* @param executor 线程池 * @param executor 线程池
* @param date 日期 * @param date 日期
* @param orgCodeList 所有区域编码列表 * @param orgCodeList 所有区域编码列表
* @param setDate 设置哪个日期的同步记录状态为2 部分同步 为null则不设置
* @return 处理结果 * @return 处理结果
*/ */
private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List<String> orgCodeList) { private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List<String> orgCodeList, String setDate) {
List<List<String>> orgCodeBatches = partitionList(orgCodeList, ORG_BATCH_SIZE); List<List<String>> orgCodeBatches = partitionList(orgCodeList, ORG_BATCH_SIZE);
log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size()); log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size());
log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor)); log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor));
@ -210,7 +216,7 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
date, null, "同步超时")); date, null, "同步超时"));
} }
batchService.saveDailySyncRecord(date, allErrors); batchService.saveDailySyncRecord(date, allErrors, setDate);
return allErrors.isEmpty(); return allErrors.isEmpty();
} }
@ -223,35 +229,16 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
private String getStartDate() { private String getStartDate() {
try { try {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd"); DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd");
String today = LocalDate.now().format(f);
LocalDate fallback = LocalDate.now().minusDays(8); LocalDate fallback = LocalDate.now().minusDays(8);
DnerDailyPowerOutageEventSync firstError = this.lambdaQuery() DnerDailyPowerOutageEventSync eventSync = this.lambdaQuery()
.le(DnerDailyPowerOutageEventSync::getDateTime, today) .eq(DnerDailyPowerOutageEventSync::getSyncStatus, 2)
.and(q -> q.isNotNull(DnerDailyPowerOutageEventSync::getErrorMsg)
.ne(DnerDailyPowerOutageEventSync::getErrorMsg, "")
.ne(DnerDailyPowerOutageEventSync::getErrorMsg, "[]"))
.orderByAsc(DnerDailyPowerOutageEventSync::getDateTime) .orderByAsc(DnerDailyPowerOutageEventSync::getDateTime)
.last("limit 1") .last("limit 1")
.one(); .one();
if (firstError != null) { if (!ObjectUtils.isEmpty(eventSync)) {
LocalDate candidate = LocalDate.parse(firstError.getDateTime(), f); LocalDate candidate = LocalDate.parse(eventSync.getDateTime(), f);
return maxDate(fallback, candidate).format(f); return candidate.format(f);
}
DnerDailyPowerOutageEventSync lastSuccessBeforeToday = this.lambdaQuery()
.lt(DnerDailyPowerOutageEventSync::getDateTime, today)
.and(q -> q.isNull(DnerDailyPowerOutageEventSync::getErrorMsg)
.or()
.eq(DnerDailyPowerOutageEventSync::getErrorMsg, "")
.or()
.eq(DnerDailyPowerOutageEventSync::getErrorMsg, "[]"))
.orderByDesc(DnerDailyPowerOutageEventSync::getDateTime)
.last("limit 1")
.one();
if (lastSuccessBeforeToday != null) {
LocalDate candidate = LocalDate.parse(lastSuccessBeforeToday.getDateTime(), f).plusDays(1);
return maxDate(fallback, candidate).format(f);
} }
return fallback.format(f); return fallback.format(f);
@ -261,20 +248,15 @@ public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDa
} }
} }
private LocalDate maxDate(LocalDate a, LocalDate b) {
return a.isAfter(b) ? a : b;
}
private DnerDailyPowerOutageEventSync getLatestSyncRecordByDate(String date) { private Long getLatestSyncRecordByDate(String date) {
try { try {
return this.lambdaQuery() return this.lambdaQuery()
.eq(DnerDailyPowerOutageEventSync::getDateTime, date) .eq(DnerDailyPowerOutageEventSync::getDateTime, date)
.orderByDesc(DnerDailyPowerOutageEventSync::getId) .count();
.last("limit 1")
.one();
} catch (Exception e) { } catch (Exception e) {
log.warn("查询日期 {} 同步记录失败: {}", date, e.getMessage(), e); log.warn("查询日期 {} 同步记录失败: {}", date, e.getMessage(), e);
return null; return 0L;
} }
} }

View File

@ -4,7 +4,7 @@
<!-- namespace 必须 = Mapper 接口全类名 --> <!-- namespace 必须 = Mapper 接口全类名 -->
<mapper namespace="com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper"> <mapper namespace="com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper">
<select id="selectListByConditions"> <select id="selectListByConditions" resultType="com.southern.power.grid.entity.DnerHourlyPowerOutageEvent">
select `id`, `org_code`, `data_time`, `hourly_precipitation`, `daily_precipitation`, `temperature`, select `id`, `org_code`, `data_time`, `hourly_precipitation`, `daily_precipitation`, `temperature`,
`hourly_max_temperature`, `hourly_min_temperature`, `extreme_wind_speed_hourly`, `user_count`, `hourly_max_temperature`, `hourly_min_temperature`, `extreme_wind_speed_hourly`, `user_count`,
`outage_state`, `outage_type`, `create_by`, `create_time`, `update_by`, `update_time` `outage_state`, `outage_type`, `create_by`, `create_time`, `update_by`, `update_time`
@ -50,4 +50,16 @@
#{code} #{code}
</foreach> </foreach>
</select> </select>
<select id="selectByOrgCodesAndDataTime" resultType="com.southern.power.grid.entity.DnerHourlyPowerOutageEvent">
select id, org_code, data_time, hourly_precipitation, daily_precipitation, temperature, hourly_max_temperature,
hourly_min_temperature, extreme_wind_speed_hourly, user_count, outage_state, outage_type from
dner_hourly_power_outage_event
WHERE org_code IN
<foreach collection="orgCodes" item="code" open="(" separator="," close=")">
#{code}
</foreach>
and data_time <![CDATA[ >= ]]> #{startDate}
and data_time <![CDATA[ <= ]]> #{endDate}
</select>
</mapper> </mapper>