diff --git a/src/main/java/com/southern/power/grid/config/BusinessThreadPoolFactory.java b/src/main/java/com/southern/power/grid/config/BusinessThreadPoolFactory.java new file mode 100644 index 0000000..b2f8474 --- /dev/null +++ b/src/main/java/com/southern/power/grid/config/BusinessThreadPoolFactory.java @@ -0,0 +1,140 @@ +package com.southern.power.grid.config; + +import com.southern.power.grid.utils.ThreadPoolUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 业务线程池工厂 + * 提供标准化的业务线程池创建方法 + */ +@Component +@Slf4j +public class BusinessThreadPoolFactory { + + @Autowired + private ThreadPoolUtil threadPoolUtil; + + // 缓存已创建的线程池 + private final ConcurrentMap threadPoolCache = new ConcurrentHashMap<>(); + + /** + * 获取或创建业务线程池(线程安全) + * @param businessCode 业务代码,用于标识业务类型 + * @return 线程池 + */ + public ThreadPoolTaskExecutor getOrCreateThreadPool(String businessCode) { + ThreadPoolTaskExecutor existing = threadPoolCache.get(businessCode); + if (existing != null) { + ThreadPoolExecutor tpe = existing.getThreadPoolExecutor(); + if (tpe.isShutdown() || tpe.isTerminated()) { + threadPoolCache.remove(businessCode); + existing = null; + } + } + if (existing != null) { + return existing; + } + String threadNamePrefix = generateThreadNamePrefix(businessCode); + log.info("创建业务线程池 - 业务代码: {}, 线程前缀: {}", businessCode, threadNamePrefix); + ThreadPoolTaskExecutor created = threadPoolUtil.createThreadPool(threadNamePrefix); + threadPoolCache.put(businessCode, created); + return created; + } + + /** + * 获取或创建业务线程池(可自定义参数) + */ + public ThreadPoolTaskExecutor getOrCreateThreadPool(String businessCode, + int corePoolSize, + int maxPoolSize, + int queueCapacity) { + String cacheKey = String.format("%s_%d_%d_%d", businessCode, corePoolSize, maxPoolSize, queueCapacity); + + return threadPoolCache.computeIfAbsent(cacheKey, key -> { + String threadNamePrefix = generateThreadNamePrefix(businessCode); + log.info("创建自定义参数业务线程池 - 业务代码: {}, 线程前缀: {}, 核心线程: {}, 最大线程: {}, 队列容量: {}", + businessCode, threadNamePrefix, corePoolSize, maxPoolSize, queueCapacity); + return threadPoolUtil.createThreadPool(threadNamePrefix, corePoolSize, maxPoolSize, queueCapacity); + }); + } + + /** + * 根据业务代码生成线程名称前缀 + */ + private String generateThreadNamePrefix(String businessCode) { + // 业务代码到线程前缀的映射 + switch (businessCode) { + case "DAILY_POWER": + return "Dner-Daily-Power-Async"; + default: + return "Biz-" + businessCode + "-Async"; + } + } + + /** + * 获取所有线程池的监控信息 + */ + public String getAllThreadPoolMonitorInfo() { + StringBuilder sb = new StringBuilder(); + sb.append("=== 线程池监控信息 ===\n"); + + threadPoolCache.forEach((key, executor) -> { + sb.append("业务代码: ").append(key).append("\n"); + sb.append(" 状态: ").append(ThreadPoolUtil.getThreadPoolMonitorInfo(executor)).append("\n"); + }); + + return sb.toString(); + } + + /** + * 优雅关闭所有线程池 + */ + public void shutdownAllThreadPools() { + log.info("开始关闭所有业务线程池,共 {} 个", threadPoolCache.size()); + + // 复制一份键值对,避免 ConcurrentModificationException + new java.util.HashSet<>(threadPoolCache.keySet()).forEach(key -> { + ThreadPoolTaskExecutor executor = threadPoolCache.remove(key); + if (executor != null) { + try { + String threadNamePrefix = executor.getThreadNamePrefix(); + ThreadPoolUtil.shutdownThreadPool(executor, threadNamePrefix); + } catch (Exception e) { + log.error("关闭线程池失败 - 业务代码:{}", key, e); + } + } + }); + + threadPoolCache.clear(); + log.info("所有业务线程池已关闭"); + } + + /** + * 关闭指定业务的线程池 + */ + public void shutdownThreadPool(String businessCode) { + ThreadPoolTaskExecutor executor = threadPoolCache.remove(businessCode); + if (executor != null) { + try { + String threadNamePrefix = executor.getThreadNamePrefix(); + ThreadPoolUtil.shutdownThreadPool(executor, threadNamePrefix); + } catch (Exception e) { + log.error("关闭线程池失败 - 业务代码:{}", businessCode, e); + } + } + } + + /** + * 业务代码常量 + */ + public static class BusinessCodes { + public static final String DAILY_POWER = "DAILY_POWER"; + } +} diff --git a/src/main/java/com/southern/power/grid/config/ThreadPoolProperties.java b/src/main/java/com/southern/power/grid/config/ThreadPoolProperties.java new file mode 100644 index 0000000..fc35baf --- /dev/null +++ b/src/main/java/com/southern/power/grid/config/ThreadPoolProperties.java @@ -0,0 +1,16 @@ +package com.southern.power.grid.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "async.thread-pool") +@Data +public class ThreadPoolProperties { + private int corePoolSize = 10; + private int maxPoolSize = 20; + private int queueCapacity = 1000; + private int keepAliveSeconds = 60; + private int awaitTerminationSeconds = 60; +} diff --git a/src/main/java/com/southern/power/grid/dao/DnerDailyPowerOutageEventSyncMapper.java b/src/main/java/com/southern/power/grid/dao/DnerDailyPowerOutageEventSyncMapper.java new file mode 100644 index 0000000..9bf9477 --- /dev/null +++ b/src/main/java/com/southern/power/grid/dao/DnerDailyPowerOutageEventSyncMapper.java @@ -0,0 +1,17 @@ +package com.southern.power.grid.dao; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.southern.power.grid.entity.DnerDailyPowerOutageEventSync; +import org.apache.ibatis.annotations.Mapper; + + +/** + * 日K线停电事件同步 + * + * @author fsyud + * @date 2026/03/16 + */ +@Mapper +public interface DnerDailyPowerOutageEventSyncMapper extends BaseMapper { + +} diff --git a/src/main/java/com/southern/power/grid/entity/DnerDailyPowerOutageEventSync.java b/src/main/java/com/southern/power/grid/entity/DnerDailyPowerOutageEventSync.java new file mode 100644 index 0000000..82778d7 --- /dev/null +++ b/src/main/java/com/southern/power/grid/entity/DnerDailyPowerOutageEventSync.java @@ -0,0 +1,54 @@ +package com.southern.power.grid.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.time.LocalDateTime; + +@Data +@TableName("dner_daily_power_outage_event_sync") +public class DnerDailyPowerOutageEventSync { + + /** + * 主键ID + */ + @TableId(type = IdType.AUTO) + private Long id; + + /** + * 日期(yyyy-mm-dd) + */ + private String dateTime; + + /** + * 同步状态 1 非当日调用 2 当日调用 + */ + private Integer syncStatus; + + /** + * 同步失败信息 + */ + private String errorMsg; + + /** + * 创建人 + */ + private String createBy; + + /** + * 创建时间 + */ + private LocalDateTime createTime; + + /** + * 修改人 + */ + private String updateBy; + + /** + * 修改时间 + */ + private LocalDateTime updateTime; +} diff --git a/src/main/java/com/southern/power/grid/service/IDnerDailyPowerOutageEventSyncService.java b/src/main/java/com/southern/power/grid/service/IDnerDailyPowerOutageEventSyncService.java new file mode 100644 index 0000000..e22764d --- /dev/null +++ b/src/main/java/com/southern/power/grid/service/IDnerDailyPowerOutageEventSyncService.java @@ -0,0 +1,11 @@ +package com.southern.power.grid.service; + +public interface IDnerDailyPowerOutageEventSyncService { + + /** + * 日K线停电事件同步 + * + * @return 布尔型 + */ + Boolean syncDnerDailyPowerOutageEvent(); +} diff --git a/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventBatchService.java b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventBatchService.java new file mode 100644 index 0000000..cb94fad --- /dev/null +++ b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventBatchService.java @@ -0,0 +1,301 @@ +package com.southern.power.grid.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper; +import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper; +import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper; +import com.southern.power.grid.entity.DnerDailyPowerOutageEvent; +import com.southern.power.grid.entity.DnerDailyPowerOutageEventSync; +import com.southern.power.grid.entity.DnerHourlyPowerOutageEvent; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.stream.Collectors; + +/** + * 日K线停电事件批量处理服务 + * 专门处理事务性的批量同步操作,解决@Transactional自调用问题 + * + * @author: System + * @date: 2026/3/17 + */ +@Service +@Slf4j +@RequiredArgsConstructor +public class DnerDailyPowerOutageEventBatchService { + + private final DnerDailyPowerOutageEventMapper dailyPowerOutageEventMapper; + private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper; + private final DnerDailyPowerOutageEventSyncMapper syncMapper; + private final ObjectMapper mapper = new ObjectMapper(); + + private static final int BATCH_SIZE = 600; + + /** + * 删除指定日期的存量数据 + * 删除dner_daily_power_outage_event(对应data_time字段)和dner_daily_power_outage_event_sync(对应date_time字段)中的对应数据 + * + * @param date 日期 + */ + public void deleteDateData(String date) { + try { + log.info("开始删除日期 {} 的存量数据", date); + + // 1. 删除dner_daily_power_outage_event表中对应日期的数据 + int deleteDailyCount = dailyPowerOutageEventMapper.delete( + new LambdaQueryWrapper() + .apply("DATE(data_time) = {0}", date) + ); + log.info("日期 {} 删除日表 {} 条存量数据", date, deleteDailyCount); + + // 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录 + int deleteSyncCount = syncMapper.delete( + new LambdaQueryWrapper() + .eq(DnerDailyPowerOutageEventSync::getDateTime, date) + ); + log.info("日期 {} 删除同步记录表 {} 条存量数据", date, deleteSyncCount); + + } catch (Exception e) { + log.error("日期 {} 删除存量数据失败: {}", date, e.getMessage(), e); + throw new RuntimeException("删除日期 " + date + " 存量数据失败: " + e.getMessage(), e); + } + } + + /** + * 同步指定日期的一批区域数据(带事务) + * + * @param date 日期 + * @param orgCodeList 区域编码列表(批次) + * @param batchIndex 批次索引 + */ + public void syncDateDataForBatch(String date, List orgCodeList, int batchIndex) { + log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size()); + + List allData = new ArrayList<>(); + + List allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date); + if (CollectionUtils.isEmpty(allHourlyEvents)) { + log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex); + return; + } + Map> orgCodeToHourlyEvents = allHourlyEvents.stream().collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode)); + + for (String orgCode : orgCodeList) { + try { + List eventList = orgCodeToHourlyEvents.get(orgCode); + + if (CollectionUtils.isEmpty(eventList)) { + log.warn("日期 {} 区域 {} 无小时数据", date, orgCode); + continue; + } + + // 2. 转换为日数据 + DnerDailyPowerOutageEvent dailyPowerOutageEvent = convertToDailyEvent(eventList, date, orgCode); + if (ObjectUtils.isEmpty(dailyPowerOutageEvent)) { + log.warn("转换日K线数据失败:日期:{}, 区域编号:{}", date, orgCode); + continue; + } + + // 3. 保存两条停电状态不同的数据(停电中=2,已复电=3) + DnerDailyPowerOutageEvent copyEvent = deepCopyByJackson(dailyPowerOutageEvent, DnerDailyPowerOutageEvent.class); + dailyPowerOutageEvent.setOutageState(2); + copyEvent.setOutageState(3); + + allData.add(dailyPowerOutageEvent); + allData.add(copyEvent); + + } catch (Exception e) { + log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e); + } + } + + // 批量保存数据 + if (!CollectionUtils.isEmpty(allData)) { + try { + // 分段插入,每段最多 600 条 + int totalInserted = 0; + for (int i = 0; i < allData.size(); i += BATCH_SIZE) { + int end = Math.min(i + BATCH_SIZE, allData.size()); + List subList = allData.subList(i, end); + + for (DnerDailyPowerOutageEvent event : subList) { + dailyPowerOutageEventMapper.insert(event); + } + totalInserted += subList.size(); + log.debug("日期 {} 批次 {} 已插入 {}/{} 条记录", date, batchIndex, totalInserted, allData.size()); + } + + log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted); + } catch (Exception e) { + log.error("日期 {} 批次 {} 保存数据时发生异常:{}", date, batchIndex, e.getMessage(), e); + throw new RuntimeException("保存日 K 线数据失败:" + e.getMessage(), e); + } + } else { + log.info("日期 {} 批次 {} 无数据需要保存", date, batchIndex); + } + } + + /** + * 保存每日同步记录 + * 一个日期的所有数据同步完成后,保存一条同步记录 + * + * @param date 同步日期 + * @param errors 错误信息列表 + */ + public void saveDailySyncRecord(String date, List errors) { + try { + DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync(); + record.setDateTime(date); + + // 判断同步状态 + // 如果是当前日期,状态为2;否则为1 + String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + int syncStatus = today.equals(date) ? 2 : 1; + + // 记录错误信息 + String errorMsg = null; + if (!CollectionUtils.isEmpty(errors)) { + errorMsg = mapper.writeValueAsString(errors); + } + + record.setSyncStatus(syncStatus); + record.setErrorMsg(errorMsg); + + syncMapper.insert(record); + + log.info("日期 {} 同步记录已保存,状态: {}, 错误数: {}", + date, syncStatus, CollectionUtils.isEmpty(errors) ? 0 : errors.size()); + + } catch (Exception e) { + log.error("日期 {} 保存同步记录失败: {}", date, e.getMessage(), e); + // 不抛出异常,避免影响主流程 + } + } + + /** + * 根据日期查询小时数据 + */ + private List queryHourlyDataByDate(String orgCode, String date) { + try { + String start = date + " 00:00:00"; + String end = date + " 23:59:59"; + return hourlyPowerOutageEventMapper.selectList( + new LambdaQueryWrapper() + .eq(DnerHourlyPowerOutageEvent::getOrgCode, orgCode) + .ge(DnerHourlyPowerOutageEvent::getDataTime, start) + .le(DnerHourlyPowerOutageEvent::getDataTime, end) + ); + } catch (Exception e) { + log.error("查询小时数据失败 - 区域: {}, 日期: {}, 错误: {}", orgCode, date, e.getMessage(), e); + throw new RuntimeException("查询小时数据失败: " + e.getMessage(), e); + } + } + + private List queryHourlyDataByDateAndOrgCodes(List orgCodes, String date) { + if (CollectionUtils.isEmpty(orgCodes)) { + return new ArrayList<>(); + } + try { + String start = date + " 00:00:00"; + String end = date + " 23:59:59"; + return hourlyPowerOutageEventMapper.selectList( + new LambdaQueryWrapper() + .in(DnerHourlyPowerOutageEvent::getOrgCode, orgCodes) + .ge(DnerHourlyPowerOutageEvent::getDataTime, start) + .le(DnerHourlyPowerOutageEvent::getDataTime, end) + ); + } catch (Exception e) { + log.error("批量查询小时数据失败 - 日期: {}, 区域数: {}, 错误: {}", date, orgCodes.size(), e.getMessage(), e); + throw new RuntimeException("批量查询小时数据失败: " + e.getMessage(), e); + } + } + + /** + * 转换为每日事件 + */ + private DnerDailyPowerOutageEvent convertToDailyEvent(List list, String date, String orgCode) { + if (list == null || list.isEmpty()) { + return null; + } + + try { + DnerDailyPowerOutageEvent daily = new DnerDailyPowerOutageEvent(); + daily.setOrgCode(orgCode); + daily.setDataTime(date); + + // 用户数统计 + IntSummaryStatistics userStats = list.stream() + .mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount()) + .summaryStatistics(); + daily.setUserCount((int) userStats.getSum()); + daily.setMaxUserCount(userStats.getMax()); + daily.setMinUserCount(userStats.getMin()); + + // 起始和结束用户数 + daily.setStarUserCount(list.stream().min(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) + .map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0)); + daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime)) + .map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0)); + + // 气象数据统计 + daily.setDailyPrecipitation(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getDailyPrecipitation())).sum())); + daily.setTemperature(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getTemperature())).average().orElse(0.0))); + daily.setHourlyMaxTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMaxTemperature())).max().orElse(0.0))); + daily.setHourlyMinTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMinTemperature())).min().orElse(0.0))); + daily.setExtremeWindSpeedHourly(fmt(list.stream().mapToDouble(e -> toDouble(e.getExtremeWindSpeedHourly())).max().orElse(0.0))); + daily.setPowerOutageDuration(fmt(list.stream().mapToDouble(e -> toDouble(e.getPowerOutageDuration())).max().orElse(0.0))); + + return daily; + } catch (Exception e) { + log.error("转换日K线数据失败 - 日期: {}, 区域: {}, 错误: {}", date, orgCode, e.getMessage(), e); + return null; + } + } + + private double toDouble(String val) { + try { + return (val == null || val.trim().isEmpty()) ? 0.0 : Double.parseDouble(val); + } catch (NumberFormatException e) { + return 0.0; + } + } + + private String fmt(double val) { + return String.valueOf(val).replaceAll("\\.0$", ""); + } + + /** + * 深拷贝 + */ + public T deepCopyByJackson(T source, Class clazz) { + if (source == null) { + return null; + } + try { + String json = mapper.writeValueAsString(source); + return mapper.readValue(json, clazz); + } catch (Exception e) { + log.error("Jackson深拷贝失败: {}", e.getMessage(), e); + throw new RuntimeException("Jackson深拷贝失败", e); + } + } + + /** + * 同步错误信息 + */ + @lombok.Data + @lombok.AllArgsConstructor + @lombok.NoArgsConstructor + public static class SyncErrorInfo { + private String dateTime; + private String orgCode; + private String message; + } +} diff --git a/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventSyncServiceImpl.java b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventSyncServiceImpl.java new file mode 100644 index 0000000..1d7f672 --- /dev/null +++ b/src/main/java/com/southern/power/grid/service/impl/DnerDailyPowerOutageEventSyncServiceImpl.java @@ -0,0 +1,344 @@ +package com.southern.power.grid.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.southern.power.grid.config.BusinessThreadPoolFactory; +import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper; +import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper; +import com.southern.power.grid.entity.DnerDailyPowerOutageEventSync; +import com.southern.power.grid.entity.DnerHourlyPowerOutageEvent; +import com.southern.power.grid.service.IDnerDailyPowerOutageEventSyncService; +import com.southern.power.grid.utils.ThreadPoolUtil; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * 日K线停电事件同步服务实现类 + * 功能:从dner_hourly_power_outage_event表查询数据,经过处理后同步到dner_daily_power_outage_event表 + * 特性:多线程处理、事务支持(通过独立事务类)、错误回滚、完善日志 + * + * {@code @author:} System + * {@code @date:} 2026/3/17 + */ +@Service +@Slf4j +@RequiredArgsConstructor +public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl + implements IDnerDailyPowerOutageEventSyncService { + + private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper; + private final BusinessThreadPoolFactory businessThreadPoolFactory; + private final DnerDailyPowerOutageEventBatchService batchService; + + /** 每批次处理的区域数量 */ + private static final int ORG_BATCH_SIZE = 100; + + /** + * 主同步方法 + * 按日期顺序同步数据,每个日期同步完成后保存一条同步记录 + * + * @return 同步结果 + */ + @Override + public Boolean syncDnerDailyPowerOutageEvent() { + log.info("================== 开始日K线停电事件同步任务 =================="); + long startTime = System.currentTimeMillis(); + + try { + String startDate = getStartDate(); + String endDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + + log.info("开始日期: {}, 结束日期: {}", startDate, endDate); + + List allDates = getAllDates(startDate, endDate); + if (CollectionUtils.isEmpty(allDates)) { + log.warn("日期范围为空,同步任务结束"); + return true; + } + log.info("共需要处理 {} 个日期", allDates.size()); + + List orgCodeList = getOrgCodeList(startDate, endDate); + if (CollectionUtils.isEmpty(orgCodeList)) { + log.warn("在指定日期范围内未找到任何区域编码,同步任务结束"); + return true; + } + log.info("共获取到 {} 个区域编码需要处理", orgCodeList.size()); + + ThreadPoolTaskExecutor executor = businessThreadPoolFactory.getOrCreateThreadPool( + BusinessThreadPoolFactory.BusinessCodes.DAILY_POWER + ); + + boolean allSuccess = true; + String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + for (String date : allDates) { + try { + log.info("开始处理日期:{}", date); + + // 检查是否已存在成功同步记录 + DnerDailyPowerOutageEventSync existing = getLatestSyncRecordByDate(date); + boolean isToday = date.equals(today); + + // 非今日且已同步成功,则跳过 + if (existing != null && isSyncSuccess(existing) && !isToday) { + log.info("日期 {} 已存在成功同步记录,跳过处理", date); + continue; + } + + // 今日数据或之前失败的记录,先删除旧数据 + if (existing != null || isToday) { + log.info("日期 {} 存在旧数据,先删除", date); + batchService.deleteDateData(date); + } + + // 处理该日期的数据 + boolean success = processDateData(executor, date, orgCodeList); + + if (success) { + log.info("日期 {} 处理完成", date); + } else { + log.error("日期 {} 处理失败", date); + allSuccess = false; + } + + } catch (Exception e) { + log.error("处理日期 {} 时发生异常:{}", date, e.getMessage(), e); + allSuccess = false; + } + } + + long endTime = System.currentTimeMillis(); + log.info("================== 日K线停电事件同步任务完成 =================="); + log.info("总耗时: {} ms", endTime - startTime); + + return allSuccess; + + } catch (Exception e) { + log.error("同步日K线停电事件数据发生严重异常: {}", e.getMessage(), e); + return false; + } + } + + /** + * 处理单个日期的数据 + * 如果是开始日期,先删除存量数据,然后多线程同步所有区域数据,最后保存一条同步记录 + * + * @param executor 线程池 + * @param date 日期 + * @param orgCodeList 所有区域编码列表 + * @return 处理结果 + */ + private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List orgCodeList) { + List> orgCodeBatches = partitionList(orgCodeList, ORG_BATCH_SIZE); + log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size()); + log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor)); + + List allErrors = new ArrayList<>(); + List> futures = new ArrayList<>(orgCodeBatches.size()); + + for (int i = 0; i < orgCodeBatches.size(); i++) { + final List batchOrgCodes = orgCodeBatches.get(i); + final int batchIndex = i + 1; + + try { + Future future = executor.submit(() -> { + try { + log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size()); + batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex); + return null; + } catch (Exception e) { + log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e); + return new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( + date, "批次" + batchIndex, e.getMessage()); + } + }); + futures.add(future); + } catch (Exception e) { + log.error("日期 {} 批次 {} 提交任务失败: {}", date, batchIndex, e.getMessage(), e); + allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( + date, "批次" + batchIndex, e.getMessage())); + } + } + log.info("日期 {} 已提交任务数: {}", date, futures.size()); + + long deadlineNanos = System.nanoTime() + TimeUnit.HOURS.toNanos(2); + boolean timedOut = false; + for (Future future : futures) { + long remainingNanos = deadlineNanos - System.nanoTime(); + if (remainingNanos <= 0) { + timedOut = true; + break; + } + try { + DnerDailyPowerOutageEventBatchService.SyncErrorInfo error = + future.get(remainingNanos, TimeUnit.NANOSECONDS); + if (error != null) { + allErrors.add(error); + } + } catch (TimeoutException e) { + timedOut = true; + break; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( + date, null, "同步被中断")); + timedOut = true; + break; + } catch (Exception e) { + allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( + date, null, e.getMessage())); + } + } + if (timedOut) { + log.error("日期 {} 同步超时", date); + for (Future future : futures) { + if (!future.isDone()) { + future.cancel(true); + } + } + allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo( + date, null, "同步超时")); + } + + batchService.saveDailySyncRecord(date, allErrors); + + return allErrors.isEmpty(); + } + + /** + * 获取同步的开始日期 + * 默认开始日期为当前日期往前8天 + * 如果存在部分同步的日期,则从该日期开始 + */ + private String getStartDate() { + try { + DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + String today = LocalDate.now().format(f); + LocalDate fallback = LocalDate.now().minusDays(8); + + DnerDailyPowerOutageEventSync firstError = this.lambdaQuery() + .le(DnerDailyPowerOutageEventSync::getDateTime, today) + .and(q -> q.isNotNull(DnerDailyPowerOutageEventSync::getErrorMsg) + .ne(DnerDailyPowerOutageEventSync::getErrorMsg, "") + .ne(DnerDailyPowerOutageEventSync::getErrorMsg, "[]")) + .orderByAsc(DnerDailyPowerOutageEventSync::getDateTime) + .last("limit 1") + .one(); + if (firstError != null) { + LocalDate candidate = LocalDate.parse(firstError.getDateTime(), f); + return maxDate(fallback, candidate).format(f); + } + + DnerDailyPowerOutageEventSync lastSuccessBeforeToday = this.lambdaQuery() + .lt(DnerDailyPowerOutageEventSync::getDateTime, today) + .and(q -> q.isNull(DnerDailyPowerOutageEventSync::getErrorMsg) + .or() + .eq(DnerDailyPowerOutageEventSync::getErrorMsg, "") + .or() + .eq(DnerDailyPowerOutageEventSync::getErrorMsg, "[]")) + .orderByDesc(DnerDailyPowerOutageEventSync::getDateTime) + .last("limit 1") + .one(); + if (lastSuccessBeforeToday != null) { + LocalDate candidate = LocalDate.parse(lastSuccessBeforeToday.getDateTime(), f).plusDays(1); + return maxDate(fallback, candidate).format(f); + } + + return fallback.format(f); + } catch (Exception e) { + log.error("获取开始日期失败: {}", e.getMessage(), e); + return LocalDate.now().minusDays(8).format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); + } + } + + private LocalDate maxDate(LocalDate a, LocalDate b) { + return a.isAfter(b) ? a : b; + } + + private DnerDailyPowerOutageEventSync getLatestSyncRecordByDate(String date) { + try { + return this.lambdaQuery() + .eq(DnerDailyPowerOutageEventSync::getDateTime, date) + .orderByDesc(DnerDailyPowerOutageEventSync::getId) + .last("limit 1") + .one(); + } catch (Exception e) { + log.warn("查询日期 {} 同步记录失败: {}", date, e.getMessage(), e); + return null; + } + } + + private boolean isSyncSuccess(DnerDailyPowerOutageEventSync record) { + String msg = record.getErrorMsg(); + return msg == null || msg.trim().isEmpty() || "[]".equals(msg.trim()); + } + + /** + * 获取所有区域编码 + */ + private List getOrgCodeList(String startDate, String endDate) { + try { + List eventList = hourlyPowerOutageEventMapper.selectList( + new LambdaQueryWrapper() + .ge(DnerHourlyPowerOutageEvent::getDataTime, startDate + " 00:00:00") + .le(DnerHourlyPowerOutageEvent::getDataTime, endDate + " 23:59:59") + .select(DnerHourlyPowerOutageEvent::getOrgCode) + .last("GROUP BY org_code") + ); + + if (CollectionUtils.isEmpty(eventList)) { + return new ArrayList<>(); + } + + return eventList.stream() + .map(DnerHourlyPowerOutageEvent::getOrgCode) + .distinct() + .collect(Collectors.toList()); + + } catch (Exception e) { + log.error("获取区域编码列表失败: {}", e.getMessage(), e); + throw new RuntimeException("获取区域编码列表失败: " + e.getMessage(), e); + } + } + + /** + * 获取所有需要处理的日期列表 + */ + private List getAllDates(String startDate, String endDate) { + DateTimeFormatter f = DateTimeFormatter.ISO_LOCAL_DATE; + LocalDate start = LocalDate.parse(startDate, f); + LocalDate end = LocalDate.parse(endDate, f); + List list = new ArrayList<>(); + + for (; !start.isAfter(end); start = start.plusDays(1)) { + list.add(start.format(f)); + } + return list; + } + + /** + * 将列表分批 + */ + private List> partitionList(List list, int size) { + List> result = new ArrayList<>(); + if (CollectionUtils.isEmpty(list) || size <= 0) { + return result; + } + + for (int i = 0; i < list.size(); i += size) { + int end = Math.min(i + size, list.size()); + result.add(list.subList(i, end)); + } + return result; + } +} diff --git a/src/main/java/com/southern/power/grid/utils/ThreadPoolUtil.java b/src/main/java/com/southern/power/grid/utils/ThreadPoolUtil.java new file mode 100644 index 0000000..b94736b --- /dev/null +++ b/src/main/java/com/southern/power/grid/utils/ThreadPoolUtil.java @@ -0,0 +1,280 @@ +package com.southern.power.grid.utils; + +import com.southern.power.grid.config.ThreadPoolProperties; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +@Component +@Slf4j +public class ThreadPoolUtil { + + @Autowired + private ThreadPoolProperties properties; + + /** + * 创建线程池(业务端传入线程名称前缀) + * @param threadNamePrefix 线程名称前缀,由业务端根据具体场景传入 + * @return 自定义线程池 + */ + public ThreadPoolTaskExecutor createThreadPool(String threadNamePrefix) { + return createThreadPool(threadNamePrefix, null, null, null); + } + + /** + * 创建完全自定义的线程池 + * @param threadNamePrefix 线程名称前缀,由业务端根据具体场景传入 + * @param corePoolSize 核心线程数(为null时使用配置文件默认值) + * @param maxPoolSize 最大线程数(为null时使用配置文件默认值) + * @param queueCapacity 队列容量(为null时使用配置文件默认值) + * @return 自定义线程池 + */ + public ThreadPoolTaskExecutor createThreadPool(String threadNamePrefix, + Integer corePoolSize, + Integer maxPoolSize, + Integer queueCapacity) { + + if (threadNamePrefix == null || threadNamePrefix.trim().isEmpty()) { + throw new IllegalArgumentException("线程名称前缀不能为空"); + } + + // 确保线程名称前缀以"-"结尾,符合规范 + final String finalThreadNamePrefix = threadNamePrefix.endsWith("-") ? + threadNamePrefix : threadNamePrefix + "-"; + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + + // 设置线程池参数 + executor.setCorePoolSize(corePoolSize != null ? corePoolSize : properties.getCorePoolSize()); + executor.setMaxPoolSize(maxPoolSize != null ? maxPoolSize : properties.getMaxPoolSize()); + executor.setQueueCapacity(queueCapacity != null ? queueCapacity : properties.getQueueCapacity()); + executor.setKeepAliveSeconds(properties.getKeepAliveSeconds()); + + // 设置线程名称前缀(由业务端传入) + executor.setThreadNamePrefix(finalThreadNamePrefix); + + // 设置拒绝策略:CallerRunsPolicy 保证任务不丢失 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + + // 设置异常处理处理器 + executor.setRejectedExecutionHandler((r, exec) -> { + log.error("线程池任务被拒绝 - 线程前缀:{}, 队列已满,执行 CallerRuns 策略", finalThreadNamePrefix); + new ThreadPoolExecutor.CallerRunsPolicy().rejectedExecution(r, exec); + }); + + // 优雅停机与线程回收 + executor.setAllowCoreThreadTimeOut(true); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds()); + + // 任务装饰器:支持上下文传递和异常捕获 + executor.setTaskDecorator(runnable -> { + // 这里可以传递主线程的上下文(如TraceId、用户信息等) + return () -> { + String threadName = Thread.currentThread().getName(); + log.debug("线程池任务开始执行 - 线程前缀: {}, 线程: {}", finalThreadNamePrefix, threadName); + try { + runnable.run(); + } catch (Exception e) { + log.error("线程池任务执行异常 - 线程前缀: {}, 线程: {}", finalThreadNamePrefix, threadName, e); + // 根据业务需求决定是否重新抛出异常 + throw e; + } finally { + log.debug("线程池任务执行完成 - 线程前缀: {}, 线程: {}", finalThreadNamePrefix, threadName); + // 清理线程上下文,防止内存泄漏 + } + }; + }); + + // 初始化线程池 + executor.initialize(); + + log.info("创建线程池成功 - 线程前缀: {}, 核心线程: {}, 最大线程: {}, 队列容量: {}", + finalThreadNamePrefix, + executor.getCorePoolSize(), + executor.getMaxPoolSize(), + executor.getQueueCapacity()); + + return executor; + } + + /** + * 创建CPU密集型任务的线程池 + * (核心线程数 = CPU核心数,最大线程数 = CPU核心数 * 2) + * @param threadNamePrefix 线程名称前缀,由业务端根据具体场景传入 + */ + public ThreadPoolTaskExecutor createCpuIntensivePool(String threadNamePrefix) { + int cpuCores = Runtime.getRuntime().availableProcessors(); + return createThreadPool(threadNamePrefix, cpuCores, cpuCores * 2, 100); + } + + /** + * 创建IO密集型任务的线程池 + * (核心线程数 = CPU核心数 * 2,最大线程数 = CPU核心数 * 4) + * @param threadNamePrefix 线程名称前缀,由业务端根据具体场景传入 + */ + public ThreadPoolTaskExecutor createIoIntensivePool(String threadNamePrefix) { + int cpuCores = Runtime.getRuntime().availableProcessors(); + return createThreadPool(threadNamePrefix, cpuCores * 2, cpuCores * 4, 200); + } + + /** + * 创建定时任务线程池(ScheduledThreadPoolExecutor) + * @param threadNamePrefix 线程名称前缀,由业务端根据具体场景传入 + * @param corePoolSize 核心线程数 + */ + public ScheduledThreadPoolExecutor createScheduledThreadPool(String threadNamePrefix, int corePoolSize) { + // 确保线程名称前缀以"-"结尾,符合规范 + final String finalThreadNamePrefix = threadNamePrefix.endsWith("-") ? + threadNamePrefix : threadNamePrefix + "-"; + + return new ScheduledThreadPoolExecutor(corePoolSize, new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, finalThreadNamePrefix + threadNumber.getAndIncrement()); + thread.setDaemon(false); + thread.setPriority(Thread.NORM_PRIORITY); + return thread; + } + }, new ThreadPoolExecutor.CallerRunsPolicy()); + } + + /** + * 创建固定大小的线程池 + * @param threadNamePrefix 线程名称前缀,由业务端根据具体场景传入 + * @param nThreads 线程数量 + */ + public ExecutorService createFixedThreadPool(String threadNamePrefix, int nThreads) { + // 确保线程名称前缀以"-"结尾,符合规范 + final String finalThreadNamePrefix = threadNamePrefix.endsWith("-") ? + threadNamePrefix : threadNamePrefix + "-"; + + ThreadFactory threadFactory = new ThreadFactory() { + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, finalThreadNamePrefix + threadNumber.getAndIncrement()); + thread.setDaemon(false); + thread.setPriority(Thread.NORM_PRIORITY); + return thread; + } + }; + + return Executors.newFixedThreadPool(nThreads, threadFactory); + } + + /** + * 线程池监控信息 + */ + public static String getThreadPoolMonitorInfo(ThreadPoolTaskExecutor executor) { + if (executor == null) { + return "ThreadPool is null"; + } + + ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor(); + return String.format( + "线程池状态 - 活跃线程: %d, 池大小: %d, 核心线程: %d, 最大线程: %d, 队列大小: %d, 已完成任务: %d, 总任务数: %d", + threadPoolExecutor.getActiveCount(), + threadPoolExecutor.getPoolSize(), + threadPoolExecutor.getCorePoolSize(), + threadPoolExecutor.getMaximumPoolSize(), + threadPoolExecutor.getQueue().size(), + threadPoolExecutor.getCompletedTaskCount(), + threadPoolExecutor.getTaskCount() + ); + } + + /** + * 安全关闭线程池 + * @param executor 要关闭的线程池 + * @param threadNamePrefix 线程名称前缀,用于日志标识 + */ + public static void shutdownThreadPool(ThreadPoolTaskExecutor executor, String threadNamePrefix) { + if (executor != null) { + log.info("开始关闭线程池 - 线程前缀: {}", threadNamePrefix); + executor.shutdown(); + try { + if (!executor.getThreadPoolExecutor().awaitTermination(60, TimeUnit.SECONDS)) { + log.warn("线程池未在指定时间内完全关闭 - 线程前缀: {}", threadNamePrefix); + executor.getThreadPoolExecutor().shutdownNow(); + } + } catch (InterruptedException e) { + log.error("线程池关闭被中断 - 线程前缀: {}", threadNamePrefix, e); + executor.getThreadPoolExecutor().shutdownNow(); + Thread.currentThread().interrupt(); + } + log.info("线程池已关闭 - 线程前缀: {}", threadNamePrefix); + } + } + + /** + * 示例:业务端如何使用动态线程名称前缀 + */ + public static class BusinessUsageExample { + + private ThreadPoolUtil threadPoolUtil; + + public BusinessUsageExample(ThreadPoolUtil threadPoolUtil) { + this.threadPoolUtil = threadPoolUtil; + } + + /** + * 日停电事件处理业务 + */ + public void processDailyPowerOutage() { + // 业务端根据自身场景传入线程名称前缀 + ThreadPoolTaskExecutor dailyPowerExecutor = threadPoolUtil + .createThreadPool("Dner-Daily-Power-Async"); + + dailyPowerExecutor.execute(() -> { + // 日停电事件处理逻辑 + System.out.println("处理日停电事件数据..."); + }); + } + + /** + * Excel 数据导入业务 + */ + public void importExcelData() { + // 业务端根据自身场景传入线程名称前缀 + ThreadPoolTaskExecutor excelImportExecutor = threadPoolUtil + .createThreadPool("Excel-Data-Import-Async"); + + excelImportExecutor.execute(() -> { + // Excel 数据导入逻辑 + System.out.println("导入 Excel 数据..."); + }); + } + + /** + * 天气数据分析业务 + */ + public void analyzeWeatherData() { + // 业务端根据自身场景传入线程名称前缀 + ThreadPoolTaskExecutor weatherAnalysisExecutor = threadPoolUtil + .createCpuIntensivePool("Weather-Analysis-Async"); + + weatherAnalysisExecutor.execute(() -> { + // 天气数据分析逻辑 + System.out.println("分析天气数据..."); + }); + } + } + + /** + * 安全关闭线程池(重载方法) + */ + public void shutdownThreadPool(ThreadPoolTaskExecutor executor) { + if (executor != null) { + String threadNamePrefix = executor.getThreadNamePrefix(); + shutdownThreadPool(executor, threadNamePrefix); + } + } +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f48c746..1dd3098 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -31,3 +31,13 @@ mybatis-plus: mapper-locations: classpath*:mapper/**/*.xml type-aliases-package: com.jinshan.mybatistest.entity +# 自定义线程池配置(用于分时停电事件表数据同步到日K线停电事件表) +async: + thread-pool: + core-pool-size: 10 # 核心线程数(常驻线程) + max-pool-size: 20 # 最大线程数(处理突发流量) + queue-capacity: 1000 # 阻塞队列容量(根据业务响应时间要求设置,过大容易OOM) + keep-alive-seconds: 60 # 空闲线程存活时间(秒),实现线程回收 + await-termination-seconds: 60 # 优雅停机:等待任务完成的最大时长(秒) + + diff --git a/src/main/resources/mapper/DnerDailyPowerOutageEventSyncMapper.xml b/src/main/resources/mapper/DnerDailyPowerOutageEventSyncMapper.xml new file mode 100644 index 0000000..8fef4ff --- /dev/null +++ b/src/main/resources/mapper/DnerDailyPowerOutageEventSyncMapper.xml @@ -0,0 +1,8 @@ + + + + + + + \ No newline at end of file diff --git a/src/main/resources/mapper/DnerHourlyPowerOutageEventMapper.xml b/src/main/resources/mapper/DnerHourlyPowerOutageEventMapper.xml index c82647f..e6b2396 100644 --- a/src/main/resources/mapper/DnerHourlyPowerOutageEventMapper.xml +++ b/src/main/resources/mapper/DnerHourlyPowerOutageEventMapper.xml @@ -50,4 +50,4 @@ #{code} - \ No newline at end of file +