提交同步K值代码

This commit is contained in:
yufengshuo 2026-03-18 17:58:54 +08:00
parent d7d837241f
commit 64b8c996b3
11 changed files with 1182 additions and 1 deletions

View File

@ -0,0 +1,140 @@
package com.southern.power.grid.config;
import com.southern.power.grid.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 业务线程池工厂
* 提供标准化的业务线程池创建方法
*/
@Component
@Slf4j
public class BusinessThreadPoolFactory {
@Autowired
private ThreadPoolUtil threadPoolUtil;
// 缓存已创建的线程池
private final ConcurrentMap<String, ThreadPoolTaskExecutor> threadPoolCache = new ConcurrentHashMap<>();
/**
* 获取或创建业务线程池线程安全
* @param businessCode 业务代码用于标识业务类型
* @return 线程池
*/
public ThreadPoolTaskExecutor getOrCreateThreadPool(String businessCode) {
ThreadPoolTaskExecutor existing = threadPoolCache.get(businessCode);
if (existing != null) {
ThreadPoolExecutor tpe = existing.getThreadPoolExecutor();
if (tpe.isShutdown() || tpe.isTerminated()) {
threadPoolCache.remove(businessCode);
existing = null;
}
}
if (existing != null) {
return existing;
}
String threadNamePrefix = generateThreadNamePrefix(businessCode);
log.info("创建业务线程池 - 业务代码: {}, 线程前缀: {}", businessCode, threadNamePrefix);
ThreadPoolTaskExecutor created = threadPoolUtil.createThreadPool(threadNamePrefix);
threadPoolCache.put(businessCode, created);
return created;
}
/**
* 获取或创建业务线程池可自定义参数
*/
public ThreadPoolTaskExecutor getOrCreateThreadPool(String businessCode,
int corePoolSize,
int maxPoolSize,
int queueCapacity) {
String cacheKey = String.format("%s_%d_%d_%d", businessCode, corePoolSize, maxPoolSize, queueCapacity);
return threadPoolCache.computeIfAbsent(cacheKey, key -> {
String threadNamePrefix = generateThreadNamePrefix(businessCode);
log.info("创建自定义参数业务线程池 - 业务代码: {}, 线程前缀: {}, 核心线程: {}, 最大线程: {}, 队列容量: {}",
businessCode, threadNamePrefix, corePoolSize, maxPoolSize, queueCapacity);
return threadPoolUtil.createThreadPool(threadNamePrefix, corePoolSize, maxPoolSize, queueCapacity);
});
}
/**
* 根据业务代码生成线程名称前缀
*/
private String generateThreadNamePrefix(String businessCode) {
// 业务代码到线程前缀的映射
switch (businessCode) {
case "DAILY_POWER":
return "Dner-Daily-Power-Async";
default:
return "Biz-" + businessCode + "-Async";
}
}
/**
* 获取所有线程池的监控信息
*/
public String getAllThreadPoolMonitorInfo() {
StringBuilder sb = new StringBuilder();
sb.append("=== 线程池监控信息 ===\n");
threadPoolCache.forEach((key, executor) -> {
sb.append("业务代码: ").append(key).append("\n");
sb.append(" 状态: ").append(ThreadPoolUtil.getThreadPoolMonitorInfo(executor)).append("\n");
});
return sb.toString();
}
/**
* 优雅关闭所有线程池
*/
public void shutdownAllThreadPools() {
log.info("开始关闭所有业务线程池,共 {} 个", threadPoolCache.size());
// 复制一份键值对避免 ConcurrentModificationException
new java.util.HashSet<>(threadPoolCache.keySet()).forEach(key -> {
ThreadPoolTaskExecutor executor = threadPoolCache.remove(key);
if (executor != null) {
try {
String threadNamePrefix = executor.getThreadNamePrefix();
ThreadPoolUtil.shutdownThreadPool(executor, threadNamePrefix);
} catch (Exception e) {
log.error("关闭线程池失败 - 业务代码:{}", key, e);
}
}
});
threadPoolCache.clear();
log.info("所有业务线程池已关闭");
}
/**
* 关闭指定业务的线程池
*/
public void shutdownThreadPool(String businessCode) {
ThreadPoolTaskExecutor executor = threadPoolCache.remove(businessCode);
if (executor != null) {
try {
String threadNamePrefix = executor.getThreadNamePrefix();
ThreadPoolUtil.shutdownThreadPool(executor, threadNamePrefix);
} catch (Exception e) {
log.error("关闭线程池失败 - 业务代码:{}", businessCode, e);
}
}
}
/**
* 业务代码常量
*/
public static class BusinessCodes {
public static final String DAILY_POWER = "DAILY_POWER";
}
}

View File

@ -0,0 +1,16 @@
package com.southern.power.grid.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "async.thread-pool")
@Data
public class ThreadPoolProperties {
private int corePoolSize = 10;
private int maxPoolSize = 20;
private int queueCapacity = 1000;
private int keepAliveSeconds = 60;
private int awaitTerminationSeconds = 60;
}

View File

@ -0,0 +1,17 @@
package com.southern.power.grid.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.southern.power.grid.entity.DnerDailyPowerOutageEventSync;
import org.apache.ibatis.annotations.Mapper;
/**
* 日K线停电事件同步
*
* @author fsyud
* &#064;date 2026/03/16
*/
@Mapper
public interface DnerDailyPowerOutageEventSyncMapper extends BaseMapper<DnerDailyPowerOutageEventSync> {
}

View File

@ -0,0 +1,54 @@
package com.southern.power.grid.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@TableName("dner_daily_power_outage_event_sync")
public class DnerDailyPowerOutageEventSync {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 日期(yyyy-mm-dd)
*/
private String dateTime;
/**
* 同步状态 1 非当日调用 2 当日调用
*/
private Integer syncStatus;
/**
* 同步失败信息
*/
private String errorMsg;
/**
* 创建人
*/
private String createBy;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 修改人
*/
private String updateBy;
/**
* 修改时间
*/
private LocalDateTime updateTime;
}

View File

@ -0,0 +1,11 @@
package com.southern.power.grid.service;
public interface IDnerDailyPowerOutageEventSyncService {
/**
* 日K线停电事件同步
*
* @return 布尔型
*/
Boolean syncDnerDailyPowerOutageEvent();
}

View File

@ -0,0 +1,301 @@
package com.southern.power.grid.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventMapper;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper;
import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper;
import com.southern.power.grid.entity.DnerDailyPowerOutageEvent;
import com.southern.power.grid.entity.DnerDailyPowerOutageEventSync;
import com.southern.power.grid.entity.DnerHourlyPowerOutageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
/**
* 日K线停电事件批量处理服务
* 专门处理事务性的批量同步操作解决@Transactional自调用问题
*
* @author: System
* @date: 2026/3/17
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class DnerDailyPowerOutageEventBatchService {
private final DnerDailyPowerOutageEventMapper dailyPowerOutageEventMapper;
private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper;
private final DnerDailyPowerOutageEventSyncMapper syncMapper;
private final ObjectMapper mapper = new ObjectMapper();
private static final int BATCH_SIZE = 600;
/**
* 删除指定日期的存量数据
* 删除dner_daily_power_outage_event(对应data_time字段)和dner_daily_power_outage_event_sync(对应date_time字段)中的对应数据
*
* @param date 日期
*/
public void deleteDateData(String date) {
try {
log.info("开始删除日期 {} 的存量数据", date);
// 1. 删除dner_daily_power_outage_event表中对应日期的数据
int deleteDailyCount = dailyPowerOutageEventMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEvent>()
.apply("DATE(data_time) = {0}", date)
);
log.info("日期 {} 删除日表 {} 条存量数据", date, deleteDailyCount);
// 2. 删除dner_daily_power_outage_event_sync表中对应日期的同步记录
int deleteSyncCount = syncMapper.delete(
new LambdaQueryWrapper<DnerDailyPowerOutageEventSync>()
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
);
log.info("日期 {} 删除同步记录表 {} 条存量数据", date, deleteSyncCount);
} catch (Exception e) {
log.error("日期 {} 删除存量数据失败: {}", date, e.getMessage(), e);
throw new RuntimeException("删除日期 " + date + " 存量数据失败: " + e.getMessage(), e);
}
}
/**
* 同步指定日期的一批区域数据带事务
*
* @param date 日期
* @param orgCodeList 区域编码列表批次
* @param batchIndex 批次索引
*/
public void syncDateDataForBatch(String date, List<String> orgCodeList, int batchIndex) {
log.info("开始同步日期 {} 批次 {},区域数量: {}", date, batchIndex, orgCodeList.size());
List<DnerDailyPowerOutageEvent> allData = new ArrayList<>();
List<DnerHourlyPowerOutageEvent> allHourlyEvents = queryHourlyDataByDateAndOrgCodes(orgCodeList, date);
if (CollectionUtils.isEmpty(allHourlyEvents)) {
log.info("日期 {} 批次 {} 无小时数据需要处理", date, batchIndex);
return;
}
Map<String, List<DnerHourlyPowerOutageEvent>> orgCodeToHourlyEvents = allHourlyEvents.stream().collect(Collectors.groupingBy(DnerHourlyPowerOutageEvent::getOrgCode));
for (String orgCode : orgCodeList) {
try {
List<DnerHourlyPowerOutageEvent> eventList = orgCodeToHourlyEvents.get(orgCode);
if (CollectionUtils.isEmpty(eventList)) {
log.warn("日期 {} 区域 {} 无小时数据", date, orgCode);
continue;
}
// 2. 转换为日数据
DnerDailyPowerOutageEvent dailyPowerOutageEvent = convertToDailyEvent(eventList, date, orgCode);
if (ObjectUtils.isEmpty(dailyPowerOutageEvent)) {
log.warn("转换日K线数据失败日期{}, 区域编号:{}", date, orgCode);
continue;
}
// 3. 保存两条停电状态不同的数据停电中=2已复电=3
DnerDailyPowerOutageEvent copyEvent = deepCopyByJackson(dailyPowerOutageEvent, DnerDailyPowerOutageEvent.class);
dailyPowerOutageEvent.setOutageState(2);
copyEvent.setOutageState(3);
allData.add(dailyPowerOutageEvent);
allData.add(copyEvent);
} catch (Exception e) {
log.error("处理区域 {} 日期 {} 时发生异常: {}", orgCode, date, e.getMessage(), e);
}
}
// 批量保存数据
if (!CollectionUtils.isEmpty(allData)) {
try {
// 分段插入每段最多 600
int totalInserted = 0;
for (int i = 0; i < allData.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, allData.size());
List<DnerDailyPowerOutageEvent> subList = allData.subList(i, end);
for (DnerDailyPowerOutageEvent event : subList) {
dailyPowerOutageEventMapper.insert(event);
}
totalInserted += subList.size();
log.debug("日期 {} 批次 {} 已插入 {}/{} 条记录", date, batchIndex, totalInserted, allData.size());
}
log.info("日期 {} 批次 {} 保存完成,共 {} 条记录", date, batchIndex, totalInserted);
} catch (Exception e) {
log.error("日期 {} 批次 {} 保存数据时发生异常:{}", date, batchIndex, e.getMessage(), e);
throw new RuntimeException("保存日 K 线数据失败:" + e.getMessage(), e);
}
} else {
log.info("日期 {} 批次 {} 无数据需要保存", date, batchIndex);
}
}
/**
* 保存每日同步记录
* 一个日期的所有数据同步完成后保存一条同步记录
*
* @param date 同步日期
* @param errors 错误信息列表
*/
public void saveDailySyncRecord(String date, List<SyncErrorInfo> errors) {
try {
DnerDailyPowerOutageEventSync record = new DnerDailyPowerOutageEventSync();
record.setDateTime(date);
// 判断同步状态
// 如果是当前日期状态为2否则为1
String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
int syncStatus = today.equals(date) ? 2 : 1;
// 记录错误信息
String errorMsg = null;
if (!CollectionUtils.isEmpty(errors)) {
errorMsg = mapper.writeValueAsString(errors);
}
record.setSyncStatus(syncStatus);
record.setErrorMsg(errorMsg);
syncMapper.insert(record);
log.info("日期 {} 同步记录已保存,状态: {}, 错误数: {}",
date, syncStatus, CollectionUtils.isEmpty(errors) ? 0 : errors.size());
} catch (Exception e) {
log.error("日期 {} 保存同步记录失败: {}", date, e.getMessage(), e);
// 不抛出异常避免影响主流程
}
}
/**
* 根据日期查询小时数据
*/
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDate(String orgCode, String date) {
try {
String start = date + " 00:00:00";
String end = date + " 23:59:59";
return hourlyPowerOutageEventMapper.selectList(
new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
.eq(DnerHourlyPowerOutageEvent::getOrgCode, orgCode)
.ge(DnerHourlyPowerOutageEvent::getDataTime, start)
.le(DnerHourlyPowerOutageEvent::getDataTime, end)
);
} catch (Exception e) {
log.error("查询小时数据失败 - 区域: {}, 日期: {}, 错误: {}", orgCode, date, e.getMessage(), e);
throw new RuntimeException("查询小时数据失败: " + e.getMessage(), e);
}
}
private List<DnerHourlyPowerOutageEvent> queryHourlyDataByDateAndOrgCodes(List<String> orgCodes, String date) {
if (CollectionUtils.isEmpty(orgCodes)) {
return new ArrayList<>();
}
try {
String start = date + " 00:00:00";
String end = date + " 23:59:59";
return hourlyPowerOutageEventMapper.selectList(
new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
.in(DnerHourlyPowerOutageEvent::getOrgCode, orgCodes)
.ge(DnerHourlyPowerOutageEvent::getDataTime, start)
.le(DnerHourlyPowerOutageEvent::getDataTime, end)
);
} catch (Exception e) {
log.error("批量查询小时数据失败 - 日期: {}, 区域数: {}, 错误: {}", date, orgCodes.size(), e.getMessage(), e);
throw new RuntimeException("批量查询小时数据失败: " + e.getMessage(), e);
}
}
/**
* 转换为每日事件
*/
private DnerDailyPowerOutageEvent convertToDailyEvent(List<DnerHourlyPowerOutageEvent> list, String date, String orgCode) {
if (list == null || list.isEmpty()) {
return null;
}
try {
DnerDailyPowerOutageEvent daily = new DnerDailyPowerOutageEvent();
daily.setOrgCode(orgCode);
daily.setDataTime(date);
// 用户数统计
IntSummaryStatistics userStats = list.stream()
.mapToInt(e -> e.getUserCount() == null ? 0 : e.getUserCount())
.summaryStatistics();
daily.setUserCount((int) userStats.getSum());
daily.setMaxUserCount(userStats.getMax());
daily.setMinUserCount(userStats.getMin());
// 起始和结束用户数
daily.setStarUserCount(list.stream().min(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0));
daily.setEndUserCount(list.stream().max(Comparator.comparing(DnerHourlyPowerOutageEvent::getDataTime))
.map(DnerHourlyPowerOutageEvent::getUserCount).orElse(0));
// 气象数据统计
daily.setDailyPrecipitation(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getDailyPrecipitation())).sum()));
daily.setTemperature(String.format("%.2f", list.stream().mapToDouble(e -> toDouble(e.getTemperature())).average().orElse(0.0)));
daily.setHourlyMaxTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMaxTemperature())).max().orElse(0.0)));
daily.setHourlyMinTemperature(fmt(list.stream().mapToDouble(e -> toDouble(e.getHourlyMinTemperature())).min().orElse(0.0)));
daily.setExtremeWindSpeedHourly(fmt(list.stream().mapToDouble(e -> toDouble(e.getExtremeWindSpeedHourly())).max().orElse(0.0)));
daily.setPowerOutageDuration(fmt(list.stream().mapToDouble(e -> toDouble(e.getPowerOutageDuration())).max().orElse(0.0)));
return daily;
} catch (Exception e) {
log.error("转换日K线数据失败 - 日期: {}, 区域: {}, 错误: {}", date, orgCode, e.getMessage(), e);
return null;
}
}
private double toDouble(String val) {
try {
return (val == null || val.trim().isEmpty()) ? 0.0 : Double.parseDouble(val);
} catch (NumberFormatException e) {
return 0.0;
}
}
private String fmt(double val) {
return String.valueOf(val).replaceAll("\\.0$", "");
}
/**
* 深拷贝
*/
public <T> T deepCopyByJackson(T source, Class<T> clazz) {
if (source == null) {
return null;
}
try {
String json = mapper.writeValueAsString(source);
return mapper.readValue(json, clazz);
} catch (Exception e) {
log.error("Jackson深拷贝失败: {}", e.getMessage(), e);
throw new RuntimeException("Jackson深拷贝失败", e);
}
}
/**
* 同步错误信息
*/
@lombok.Data
@lombok.AllArgsConstructor
@lombok.NoArgsConstructor
public static class SyncErrorInfo {
private String dateTime;
private String orgCode;
private String message;
}
}

View File

@ -0,0 +1,344 @@
package com.southern.power.grid.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.southern.power.grid.config.BusinessThreadPoolFactory;
import com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper;
import com.southern.power.grid.dao.DnerHourlyPowerOutageEventMapper;
import com.southern.power.grid.entity.DnerDailyPowerOutageEventSync;
import com.southern.power.grid.entity.DnerHourlyPowerOutageEvent;
import com.southern.power.grid.service.IDnerDailyPowerOutageEventSyncService;
import com.southern.power.grid.utils.ThreadPoolUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* 日K线停电事件同步服务实现类
* 功能从dner_hourly_power_outage_event表查询数据经过处理后同步到dner_daily_power_outage_event表
* 特性多线程处理事务支持通过独立事务类错误回滚完善日志
*
* {@code @author:} System
* {@code @date:} 2026/3/17
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class DnerDailyPowerOutageEventSyncServiceImpl extends ServiceImpl<DnerDailyPowerOutageEventSyncMapper, DnerDailyPowerOutageEventSync>
implements IDnerDailyPowerOutageEventSyncService {
private final DnerHourlyPowerOutageEventMapper hourlyPowerOutageEventMapper;
private final BusinessThreadPoolFactory businessThreadPoolFactory;
private final DnerDailyPowerOutageEventBatchService batchService;
/** 每批次处理的区域数量 */
private static final int ORG_BATCH_SIZE = 100;
/**
* 主同步方法
* 按日期顺序同步数据每个日期同步完成后保存一条同步记录
*
* @return 同步结果
*/
@Override
public Boolean syncDnerDailyPowerOutageEvent() {
log.info("================== 开始日K线停电事件同步任务 ==================");
long startTime = System.currentTimeMillis();
try {
String startDate = getStartDate();
String endDate = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
log.info("开始日期: {}, 结束日期: {}", startDate, endDate);
List<String> allDates = getAllDates(startDate, endDate);
if (CollectionUtils.isEmpty(allDates)) {
log.warn("日期范围为空,同步任务结束");
return true;
}
log.info("共需要处理 {} 个日期", allDates.size());
List<String> orgCodeList = getOrgCodeList(startDate, endDate);
if (CollectionUtils.isEmpty(orgCodeList)) {
log.warn("在指定日期范围内未找到任何区域编码,同步任务结束");
return true;
}
log.info("共获取到 {} 个区域编码需要处理", orgCodeList.size());
ThreadPoolTaskExecutor executor = businessThreadPoolFactory.getOrCreateThreadPool(
BusinessThreadPoolFactory.BusinessCodes.DAILY_POWER
);
boolean allSuccess = true;
String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
for (String date : allDates) {
try {
log.info("开始处理日期:{}", date);
// 检查是否已存在成功同步记录
DnerDailyPowerOutageEventSync existing = getLatestSyncRecordByDate(date);
boolean isToday = date.equals(today);
// 非今日且已同步成功则跳过
if (existing != null && isSyncSuccess(existing) && !isToday) {
log.info("日期 {} 已存在成功同步记录,跳过处理", date);
continue;
}
// 今日数据或之前失败的记录先删除旧数据
if (existing != null || isToday) {
log.info("日期 {} 存在旧数据,先删除", date);
batchService.deleteDateData(date);
}
// 处理该日期的数据
boolean success = processDateData(executor, date, orgCodeList);
if (success) {
log.info("日期 {} 处理完成", date);
} else {
log.error("日期 {} 处理失败", date);
allSuccess = false;
}
} catch (Exception e) {
log.error("处理日期 {} 时发生异常:{}", date, e.getMessage(), e);
allSuccess = false;
}
}
long endTime = System.currentTimeMillis();
log.info("================== 日K线停电事件同步任务完成 ==================");
log.info("总耗时: {} ms", endTime - startTime);
return allSuccess;
} catch (Exception e) {
log.error("同步日K线停电事件数据发生严重异常: {}", e.getMessage(), e);
return false;
}
}
/**
* 处理单个日期的数据
* 如果是开始日期先删除存量数据然后多线程同步所有区域数据最后保存一条同步记录
*
* @param executor 线程池
* @param date 日期
* @param orgCodeList 所有区域编码列表
* @return 处理结果
*/
private boolean processDateData(ThreadPoolTaskExecutor executor, String date, List<String> orgCodeList) {
List<List<String>> orgCodeBatches = partitionList(orgCodeList, ORG_BATCH_SIZE);
log.info("日期 {} 共分 {} 批进行处理", date, orgCodeBatches.size());
log.info("日期 {} 线程池状态: {}", date, ThreadPoolUtil.getThreadPoolMonitorInfo(executor));
List<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> allErrors = new ArrayList<>();
List<Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo>> futures = new ArrayList<>(orgCodeBatches.size());
for (int i = 0; i < orgCodeBatches.size(); i++) {
final List<String> batchOrgCodes = orgCodeBatches.get(i);
final int batchIndex = i + 1;
try {
Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> future = executor.submit(() -> {
try {
log.info("日期 {} 批次 {} 开始执行,区域数量: {}", date, batchIndex, batchOrgCodes.size());
batchService.syncDateDataForBatch(date, batchOrgCodes, batchIndex);
return null;
} catch (Exception e) {
log.error("日期 {} 批次 {} 处理失败: {}", date, batchIndex, e.getMessage(), e);
return new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
date, "批次" + batchIndex, e.getMessage());
}
});
futures.add(future);
} catch (Exception e) {
log.error("日期 {} 批次 {} 提交任务失败: {}", date, batchIndex, e.getMessage(), e);
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
date, "批次" + batchIndex, e.getMessage()));
}
}
log.info("日期 {} 已提交任务数: {}", date, futures.size());
long deadlineNanos = System.nanoTime() + TimeUnit.HOURS.toNanos(2);
boolean timedOut = false;
for (Future<DnerDailyPowerOutageEventBatchService.SyncErrorInfo> future : futures) {
long remainingNanos = deadlineNanos - System.nanoTime();
if (remainingNanos <= 0) {
timedOut = true;
break;
}
try {
DnerDailyPowerOutageEventBatchService.SyncErrorInfo error =
future.get(remainingNanos, TimeUnit.NANOSECONDS);
if (error != null) {
allErrors.add(error);
}
} catch (TimeoutException e) {
timedOut = true;
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
date, null, "同步被中断"));
timedOut = true;
break;
} catch (Exception e) {
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
date, null, e.getMessage()));
}
}
if (timedOut) {
log.error("日期 {} 同步超时", date);
for (Future<?> future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
allErrors.add(new DnerDailyPowerOutageEventBatchService.SyncErrorInfo(
date, null, "同步超时"));
}
batchService.saveDailySyncRecord(date, allErrors);
return allErrors.isEmpty();
}
/**
* 获取同步的开始日期
* 默认开始日期为当前日期往前8天
* 如果存在部分同步的日期则从该日期开始
*/
private String getStartDate() {
try {
DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd");
String today = LocalDate.now().format(f);
LocalDate fallback = LocalDate.now().minusDays(8);
DnerDailyPowerOutageEventSync firstError = this.lambdaQuery()
.le(DnerDailyPowerOutageEventSync::getDateTime, today)
.and(q -> q.isNotNull(DnerDailyPowerOutageEventSync::getErrorMsg)
.ne(DnerDailyPowerOutageEventSync::getErrorMsg, "")
.ne(DnerDailyPowerOutageEventSync::getErrorMsg, "[]"))
.orderByAsc(DnerDailyPowerOutageEventSync::getDateTime)
.last("limit 1")
.one();
if (firstError != null) {
LocalDate candidate = LocalDate.parse(firstError.getDateTime(), f);
return maxDate(fallback, candidate).format(f);
}
DnerDailyPowerOutageEventSync lastSuccessBeforeToday = this.lambdaQuery()
.lt(DnerDailyPowerOutageEventSync::getDateTime, today)
.and(q -> q.isNull(DnerDailyPowerOutageEventSync::getErrorMsg)
.or()
.eq(DnerDailyPowerOutageEventSync::getErrorMsg, "")
.or()
.eq(DnerDailyPowerOutageEventSync::getErrorMsg, "[]"))
.orderByDesc(DnerDailyPowerOutageEventSync::getDateTime)
.last("limit 1")
.one();
if (lastSuccessBeforeToday != null) {
LocalDate candidate = LocalDate.parse(lastSuccessBeforeToday.getDateTime(), f).plusDays(1);
return maxDate(fallback, candidate).format(f);
}
return fallback.format(f);
} catch (Exception e) {
log.error("获取开始日期失败: {}", e.getMessage(), e);
return LocalDate.now().minusDays(8).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
}
}
private LocalDate maxDate(LocalDate a, LocalDate b) {
return a.isAfter(b) ? a : b;
}
private DnerDailyPowerOutageEventSync getLatestSyncRecordByDate(String date) {
try {
return this.lambdaQuery()
.eq(DnerDailyPowerOutageEventSync::getDateTime, date)
.orderByDesc(DnerDailyPowerOutageEventSync::getId)
.last("limit 1")
.one();
} catch (Exception e) {
log.warn("查询日期 {} 同步记录失败: {}", date, e.getMessage(), e);
return null;
}
}
private boolean isSyncSuccess(DnerDailyPowerOutageEventSync record) {
String msg = record.getErrorMsg();
return msg == null || msg.trim().isEmpty() || "[]".equals(msg.trim());
}
/**
* 获取所有区域编码
*/
private List<String> getOrgCodeList(String startDate, String endDate) {
try {
List<DnerHourlyPowerOutageEvent> eventList = hourlyPowerOutageEventMapper.selectList(
new LambdaQueryWrapper<DnerHourlyPowerOutageEvent>()
.ge(DnerHourlyPowerOutageEvent::getDataTime, startDate + " 00:00:00")
.le(DnerHourlyPowerOutageEvent::getDataTime, endDate + " 23:59:59")
.select(DnerHourlyPowerOutageEvent::getOrgCode)
.last("GROUP BY org_code")
);
if (CollectionUtils.isEmpty(eventList)) {
return new ArrayList<>();
}
return eventList.stream()
.map(DnerHourlyPowerOutageEvent::getOrgCode)
.distinct()
.collect(Collectors.toList());
} catch (Exception e) {
log.error("获取区域编码列表失败: {}", e.getMessage(), e);
throw new RuntimeException("获取区域编码列表失败: " + e.getMessage(), e);
}
}
/**
* 获取所有需要处理的日期列表
*/
private List<String> getAllDates(String startDate, String endDate) {
DateTimeFormatter f = DateTimeFormatter.ISO_LOCAL_DATE;
LocalDate start = LocalDate.parse(startDate, f);
LocalDate end = LocalDate.parse(endDate, f);
List<String> list = new ArrayList<>();
for (; !start.isAfter(end); start = start.plusDays(1)) {
list.add(start.format(f));
}
return list;
}
/**
* 将列表分批
*/
private <T> List<List<T>> partitionList(List<T> list, int size) {
List<List<T>> result = new ArrayList<>();
if (CollectionUtils.isEmpty(list) || size <= 0) {
return result;
}
for (int i = 0; i < list.size(); i += size) {
int end = Math.min(i + size, list.size());
result.add(list.subList(i, end));
}
return result;
}
}

View File

@ -0,0 +1,280 @@
package com.southern.power.grid.utils;
import com.southern.power.grid.config.ThreadPoolProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@Slf4j
public class ThreadPoolUtil {
@Autowired
private ThreadPoolProperties properties;
/**
* 创建线程池业务端传入线程名称前缀
* @param threadNamePrefix 线程名称前缀由业务端根据具体场景传入
* @return 自定义线程池
*/
public ThreadPoolTaskExecutor createThreadPool(String threadNamePrefix) {
return createThreadPool(threadNamePrefix, null, null, null);
}
/**
* 创建完全自定义的线程池
* @param threadNamePrefix 线程名称前缀由业务端根据具体场景传入
* @param corePoolSize 核心线程数为null时使用配置文件默认值
* @param maxPoolSize 最大线程数为null时使用配置文件默认值
* @param queueCapacity 队列容量为null时使用配置文件默认值
* @return 自定义线程池
*/
public ThreadPoolTaskExecutor createThreadPool(String threadNamePrefix,
Integer corePoolSize,
Integer maxPoolSize,
Integer queueCapacity) {
if (threadNamePrefix == null || threadNamePrefix.trim().isEmpty()) {
throw new IllegalArgumentException("线程名称前缀不能为空");
}
// 确保线程名称前缀以"-"结尾符合规范
final String finalThreadNamePrefix = threadNamePrefix.endsWith("-") ?
threadNamePrefix : threadNamePrefix + "-";
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置线程池参数
executor.setCorePoolSize(corePoolSize != null ? corePoolSize : properties.getCorePoolSize());
executor.setMaxPoolSize(maxPoolSize != null ? maxPoolSize : properties.getMaxPoolSize());
executor.setQueueCapacity(queueCapacity != null ? queueCapacity : properties.getQueueCapacity());
executor.setKeepAliveSeconds(properties.getKeepAliveSeconds());
// 设置线程名称前缀由业务端传入
executor.setThreadNamePrefix(finalThreadNamePrefix);
// 设置拒绝策略CallerRunsPolicy 保证任务不丢失
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 设置异常处理处理器
executor.setRejectedExecutionHandler((r, exec) -> {
log.error("线程池任务被拒绝 - 线程前缀:{}, 队列已满,执行 CallerRuns 策略", finalThreadNamePrefix);
new ThreadPoolExecutor.CallerRunsPolicy().rejectedExecution(r, exec);
});
// 优雅停机与线程回收
executor.setAllowCoreThreadTimeOut(true);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds());
// 任务装饰器支持上下文传递和异常捕获
executor.setTaskDecorator(runnable -> {
// 这里可以传递主线程的上下文如TraceId用户信息等
return () -> {
String threadName = Thread.currentThread().getName();
log.debug("线程池任务开始执行 - 线程前缀: {}, 线程: {}", finalThreadNamePrefix, threadName);
try {
runnable.run();
} catch (Exception e) {
log.error("线程池任务执行异常 - 线程前缀: {}, 线程: {}", finalThreadNamePrefix, threadName, e);
// 根据业务需求决定是否重新抛出异常
throw e;
} finally {
log.debug("线程池任务执行完成 - 线程前缀: {}, 线程: {}", finalThreadNamePrefix, threadName);
// 清理线程上下文防止内存泄漏
}
};
});
// 初始化线程池
executor.initialize();
log.info("创建线程池成功 - 线程前缀: {}, 核心线程: {}, 最大线程: {}, 队列容量: {}",
finalThreadNamePrefix,
executor.getCorePoolSize(),
executor.getMaxPoolSize(),
executor.getQueueCapacity());
return executor;
}
/**
* 创建CPU密集型任务的线程池
* 核心线程数 = CPU核心数最大线程数 = CPU核心数 * 2
* @param threadNamePrefix 线程名称前缀由业务端根据具体场景传入
*/
public ThreadPoolTaskExecutor createCpuIntensivePool(String threadNamePrefix) {
int cpuCores = Runtime.getRuntime().availableProcessors();
return createThreadPool(threadNamePrefix, cpuCores, cpuCores * 2, 100);
}
/**
* 创建IO密集型任务的线程池
* 核心线程数 = CPU核心数 * 2最大线程数 = CPU核心数 * 4
* @param threadNamePrefix 线程名称前缀由业务端根据具体场景传入
*/
public ThreadPoolTaskExecutor createIoIntensivePool(String threadNamePrefix) {
int cpuCores = Runtime.getRuntime().availableProcessors();
return createThreadPool(threadNamePrefix, cpuCores * 2, cpuCores * 4, 200);
}
/**
* 创建定时任务线程池ScheduledThreadPoolExecutor
* @param threadNamePrefix 线程名称前缀由业务端根据具体场景传入
* @param corePoolSize 核心线程数
*/
public ScheduledThreadPoolExecutor createScheduledThreadPool(String threadNamePrefix, int corePoolSize) {
// 确保线程名称前缀以"-"结尾符合规范
final String finalThreadNamePrefix = threadNamePrefix.endsWith("-") ?
threadNamePrefix : threadNamePrefix + "-";
return new ScheduledThreadPoolExecutor(corePoolSize, new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, finalThreadNamePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 创建固定大小的线程池
* @param threadNamePrefix 线程名称前缀由业务端根据具体场景传入
* @param nThreads 线程数量
*/
public ExecutorService createFixedThreadPool(String threadNamePrefix, int nThreads) {
// 确保线程名称前缀以"-"结尾符合规范
final String finalThreadNamePrefix = threadNamePrefix.endsWith("-") ?
threadNamePrefix : threadNamePrefix + "-";
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, finalThreadNamePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
return Executors.newFixedThreadPool(nThreads, threadFactory);
}
/**
* 线程池监控信息
*/
public static String getThreadPoolMonitorInfo(ThreadPoolTaskExecutor executor) {
if (executor == null) {
return "ThreadPool is null";
}
ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
return String.format(
"线程池状态 - 活跃线程: %d, 池大小: %d, 核心线程: %d, 最大线程: %d, 队列大小: %d, 已完成任务: %d, 总任务数: %d",
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getCorePoolSize(),
threadPoolExecutor.getMaximumPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getTaskCount()
);
}
/**
* 安全关闭线程池
* @param executor 要关闭的线程池
* @param threadNamePrefix 线程名称前缀用于日志标识
*/
public static void shutdownThreadPool(ThreadPoolTaskExecutor executor, String threadNamePrefix) {
if (executor != null) {
log.info("开始关闭线程池 - 线程前缀: {}", threadNamePrefix);
executor.shutdown();
try {
if (!executor.getThreadPoolExecutor().awaitTermination(60, TimeUnit.SECONDS)) {
log.warn("线程池未在指定时间内完全关闭 - 线程前缀: {}", threadNamePrefix);
executor.getThreadPoolExecutor().shutdownNow();
}
} catch (InterruptedException e) {
log.error("线程池关闭被中断 - 线程前缀: {}", threadNamePrefix, e);
executor.getThreadPoolExecutor().shutdownNow();
Thread.currentThread().interrupt();
}
log.info("线程池已关闭 - 线程前缀: {}", threadNamePrefix);
}
}
/**
* 示例业务端如何使用动态线程名称前缀
*/
public static class BusinessUsageExample {
private ThreadPoolUtil threadPoolUtil;
public BusinessUsageExample(ThreadPoolUtil threadPoolUtil) {
this.threadPoolUtil = threadPoolUtil;
}
/**
* 日停电事件处理业务
*/
public void processDailyPowerOutage() {
// 业务端根据自身场景传入线程名称前缀
ThreadPoolTaskExecutor dailyPowerExecutor = threadPoolUtil
.createThreadPool("Dner-Daily-Power-Async");
dailyPowerExecutor.execute(() -> {
// 日停电事件处理逻辑
System.out.println("处理日停电事件数据...");
});
}
/**
* Excel 数据导入业务
*/
public void importExcelData() {
// 业务端根据自身场景传入线程名称前缀
ThreadPoolTaskExecutor excelImportExecutor = threadPoolUtil
.createThreadPool("Excel-Data-Import-Async");
excelImportExecutor.execute(() -> {
// Excel 数据导入逻辑
System.out.println("导入 Excel 数据...");
});
}
/**
* 天气数据分析业务
*/
public void analyzeWeatherData() {
// 业务端根据自身场景传入线程名称前缀
ThreadPoolTaskExecutor weatherAnalysisExecutor = threadPoolUtil
.createCpuIntensivePool("Weather-Analysis-Async");
weatherAnalysisExecutor.execute(() -> {
// 天气数据分析逻辑
System.out.println("分析天气数据...");
});
}
}
/**
* 安全关闭线程池重载方法
*/
public void shutdownThreadPool(ThreadPoolTaskExecutor executor) {
if (executor != null) {
String threadNamePrefix = executor.getThreadNamePrefix();
shutdownThreadPool(executor, threadNamePrefix);
}
}
}

View File

@ -31,3 +31,13 @@ mybatis-plus:
mapper-locations: classpath*:mapper/**/*.xml mapper-locations: classpath*:mapper/**/*.xml
type-aliases-package: com.jinshan.mybatistest.entity type-aliases-package: com.jinshan.mybatistest.entity
# 自定义线程池配置(用于分时停电事件表数据同步到日K线停电事件表)
async:
thread-pool:
core-pool-size: 10 # 核心线程数(常驻线程)
max-pool-size: 20 # 最大线程数(处理突发流量)
queue-capacity: 1000 # 阻塞队列容量根据业务响应时间要求设置过大容易OOM
keep-alive-seconds: 60 # 空闲线程存活时间(秒),实现线程回收
await-termination-seconds: 60 # 优雅停机:等待任务完成的最大时长(秒)

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<!-- namespace 必须 = Mapper 接口全类名 -->
<mapper namespace="com.southern.power.grid.dao.DnerDailyPowerOutageEventSyncMapper">
</mapper>