diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0f90e7f --- /dev/null +++ b/.gitignore @@ -0,0 +1,40 @@ + +!.mvn/wrapper/maven-wrapper.jar +**/log/ +**/logs/ +**/target/ +**/**/log/ +**/**/logs/ +**/**/target/ +.fastRequest +.qoder +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/src/main/java/com/southern/power/grid/config/BusinessThreadPoolFactory.java b/src/main/java/com/southern/power/grid/config/BusinessThreadPoolFactory.java index b2f8474..3a5dc8e 100644 --- a/src/main/java/com/southern/power/grid/config/BusinessThreadPoolFactory.java +++ b/src/main/java/com/southern/power/grid/config/BusinessThreadPoolFactory.java @@ -20,12 +20,13 @@ public class BusinessThreadPoolFactory { @Autowired private ThreadPoolUtil threadPoolUtil; - + // 缓存已创建的线程池 private final ConcurrentMap threadPoolCache = new ConcurrentHashMap<>(); - + /** * 获取或创建业务线程池(线程安全) + * * @param businessCode 业务代码,用于标识业务类型 * @return 线程池 */ @@ -47,16 +48,16 @@ public class BusinessThreadPoolFactory { threadPoolCache.put(businessCode, created); return created; } - + /** * 获取或创建业务线程池(可自定义参数) */ - public ThreadPoolTaskExecutor getOrCreateThreadPool(String businessCode, - int corePoolSize, - int maxPoolSize, + public ThreadPoolTaskExecutor getOrCreateThreadPool(String businessCode, + int corePoolSize, + int maxPoolSize, int queueCapacity) { String cacheKey = String.format("%s_%d_%d_%d", businessCode, corePoolSize, maxPoolSize, queueCapacity); - + return threadPoolCache.computeIfAbsent(cacheKey, key -> { String threadNamePrefix = generateThreadNamePrefix(businessCode); log.info("创建自定义参数业务线程池 - 业务代码: {}, 线程前缀: {}, 核心线程: {}, 最大线程: {}, 队列容量: {}", @@ -64,7 +65,7 @@ public class BusinessThreadPoolFactory { return threadPoolUtil.createThreadPool(threadNamePrefix, corePoolSize, maxPoolSize, queueCapacity); }); } - + /** * 根据业务代码生成线程名称前缀 */ @@ -73,32 +74,34 @@ public class BusinessThreadPoolFactory { switch (businessCode) { case "DAILY_POWER": return "Dner-Daily-Power-Async"; + case "REGION_WEATHER_OLD": + return "Region-Weather-Old-Async"; default: return "Biz-" + businessCode + "-Async"; } } - + /** * 获取所有线程池的监控信息 */ public String getAllThreadPoolMonitorInfo() { StringBuilder sb = new StringBuilder(); sb.append("=== 线程池监控信息 ===\n"); - + threadPoolCache.forEach((key, executor) -> { sb.append("业务代码: ").append(key).append("\n"); sb.append(" 状态: ").append(ThreadPoolUtil.getThreadPoolMonitorInfo(executor)).append("\n"); }); - + return sb.toString(); } - + /** * 优雅关闭所有线程池 */ public void shutdownAllThreadPools() { log.info("开始关闭所有业务线程池,共 {} 个", threadPoolCache.size()); - + // 复制一份键值对,避免 ConcurrentModificationException new java.util.HashSet<>(threadPoolCache.keySet()).forEach(key -> { ThreadPoolTaskExecutor executor = threadPoolCache.remove(key); @@ -111,11 +114,11 @@ public class BusinessThreadPoolFactory { } } }); - + threadPoolCache.clear(); log.info("所有业务线程池已关闭"); } - + /** * 关闭指定业务的线程池 */ @@ -130,11 +133,12 @@ public class BusinessThreadPoolFactory { } } } - + /** * 业务代码常量 */ public static class BusinessCodes { - public static final String DAILY_POWER = "DAILY_POWER"; + public static final String DAILY_POWER = "DAILY_POWER";//K值同步 + public static final String REGION_WEATHER_OLD = "REGION_WEATHER_OLD"; //区域气象历史数据同步 } } diff --git a/src/main/java/com/southern/power/grid/controller/DnerController.java b/src/main/java/com/southern/power/grid/controller/DnerController.java index 83b88b2..0cf5ffa 100644 --- a/src/main/java/com/southern/power/grid/controller/DnerController.java +++ b/src/main/java/com/southern/power/grid/controller/DnerController.java @@ -2,10 +2,7 @@ package com.southern.power.grid.controller; import com.southern.power.grid.common.Result; import com.southern.power.grid.entity.*; -import com.southern.power.grid.service.IDnerDailyPowerOutageEventService; -import com.southern.power.grid.service.IDnerHourlyPowerOutageEventService; -import com.southern.power.grid.service.IDnerSiteAreaConfigurationService; -import com.southern.power.grid.service.IImportTaskService; +import com.southern.power.grid.service.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -36,6 +33,9 @@ public class DnerController { @Autowired private IDnerSiteAreaConfigurationService dnerSiteAreaConfigurationService; + @Autowired + private IRegionalWeatherDataService regionalWeatherDataService; + @PostMapping("/area-tree/query") public Result> queryAreaTree(@RequestBody @Valid AreaTreeReq req) { return Result.success(dnerSiteAreaConfigurationService.queryAreaTree(req)); @@ -90,4 +90,17 @@ public class DnerController { ImportTask task = importTaskService.getProgress(taskNo); return Result.success(task); } + + /** + * 同步区域气象数据 + * + * @param startDate 开始日期(数据示例:2025-09-01 00:00:00) + * @param endDate 结束日期(数据示例:2025-09-01 00:00:00) + * @return 结果 <字符串> + */ + @GetMapping("/sync/regionWeatherData") + public Result syncRegionWeatherData(@RequestParam String startDate, @RequestParam String endDate) { + regionalWeatherDataService.syncOldWeatherData(startDate, endDate); + return Result.success("success"); + } } diff --git a/src/main/java/com/southern/power/grid/service/IRegionalWeatherDataService.java b/src/main/java/com/southern/power/grid/service/IRegionalWeatherDataService.java index 173f529..37cf74e 100644 --- a/src/main/java/com/southern/power/grid/service/IRegionalWeatherDataService.java +++ b/src/main/java/com/southern/power/grid/service/IRegionalWeatherDataService.java @@ -6,5 +6,14 @@ public interface IRegionalWeatherDataService { * 定时同步天气数据 * */ - void scheduleSyncWeatherData(); + void scheduleSyncWeatherData(String prevHourDate); + + + /** + * 同步旧天气数据 + * + * @param startDate 开始日期(yyyy-mm-dd hh:mm)整时时间 + * @param endDate 结束日期(yyyy-mm-dd hh:mm)整时时间 + */ + void syncOldWeatherData(String startDate, String endDate); } diff --git a/src/main/java/com/southern/power/grid/service/impl/RegionalWeatherDataServiceImpl.java b/src/main/java/com/southern/power/grid/service/impl/RegionalWeatherDataServiceImpl.java index eb3a230..cb7c35e 100644 --- a/src/main/java/com/southern/power/grid/service/impl/RegionalWeatherDataServiceImpl.java +++ b/src/main/java/com/southern/power/grid/service/impl/RegionalWeatherDataServiceImpl.java @@ -1,25 +1,23 @@ package com.southern.power.grid.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.southern.power.grid.dao.NationalWeatherStationMapper; +import com.southern.power.grid.config.BusinessThreadPoolFactory; import com.southern.power.grid.dao.RegionalWeatherDataMapper; -import com.southern.power.grid.dao.RegionalWeatherStationMapper; -import com.southern.power.grid.dao.WeatherSiteAreaConfigurationMapper; -import com.southern.power.grid.entity.*; +import com.southern.power.grid.entity.RegionalWeatherData; import com.southern.power.grid.service.IRegionalWeatherDataService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.util.CollectionUtils; -import java.math.BigDecimal; -import java.math.RoundingMode; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; @Service @Slf4j @@ -27,253 +25,105 @@ import java.util.stream.Stream; public class RegionalWeatherDataServiceImpl extends ServiceImpl implements IRegionalWeatherDataService { - private final RegionalWeatherStationMapper regionalWeatherStationMapper; - private final NationalWeatherStationMapper nationalWeatherStationMapper; - private final WeatherSiteAreaConfigurationMapper weatherSiteAreaConfigurationMapper; - private final Map weatherAreaMap = new HashMap<>(); + + private final BusinessThreadPoolFactory businessThreadPoolFactory; + private final RegionalWeatherDataSyncService regionalWeatherDataSyncService; + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"); @Override - @Transactional(rollbackFor = Exception.class) - public void scheduleSyncWeatherData() { + public void scheduleSyncWeatherData(String dateTime) { + regionalWeatherDataSyncService.loadWeatherAreaConfig(); + regionalWeatherDataSyncService.syncWeatherData(dateTime); + } + + + @Override + @Async + public void syncOldWeatherData(String startDate, String endDate) { + log.info("开始执行历史气象数据同步任务,起始时间: {},终止时间:{}", startDate, endDate); + LocalDateTime startTime; try { - //获取当前时间前一个整点时间 - String prevHourDate = LocalDateTime.now().minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00")); - List regionalWeatherStations = regionalWeatherStationMapper.getSyncData(prevHourDate); - List nationalWeatherStations = nationalWeatherStationMapper.getSyncData(prevHourDate); - if (CollectionUtils.isEmpty(regionalWeatherStations) && CollectionUtils.isEmpty(nationalWeatherStations)) { - log.warn("【区域天气数据同步任务】没有查询到天气数据:{}", prevHourDate); - return; - } - loadWeatherAreaConfig(); - List list = convertToSyncList(regionalWeatherStations, nationalWeatherStations); - if (CollectionUtils.isEmpty(list)) { - return; - } - List dataList = convert(list, prevHourDate); - this.saveBatch(dataList, 700); + startTime = LocalDateTime.parse(startDate, DATE_TIME_FORMATTER); } catch (Exception e) { - log.error("定时同步区域气象数据失败:{}", e.getMessage(), e); - throw new RuntimeException(e); - } - } - - public List convert(List list, String prevHourDate) { - if (list == null || list.isEmpty()) { - return Collections.emptyList(); - } - - // 1. 按 orgCode 分组 - Map> groupMap = - list.stream().collect(Collectors.groupingBy(SyncWeatherDataVO::getOrgCode)); - - // 2. 分组处理 - List result = new ArrayList<>(); - - for (Map.Entry> entry : groupMap.entrySet()) { - String orgCode = entry.getKey(); - List groupList = entry.getValue(); - - int size = groupList.size(); - - BigDecimal sumHourlyPrecipitation = BigDecimal.ZERO; - BigDecimal sumTemperature = BigDecimal.ZERO; - - BigDecimal maxTemp = null; - BigDecimal minTemp = null; - BigDecimal maxWind = null; - - for (SyncWeatherDataVO vo : groupList) { - - BigDecimal hourlyPre = parse(vo.getHourlyPrecipitation()); - BigDecimal temp = parse(vo.getTemperature()); - BigDecimal maxT = parse(vo.getHourlyMaxTemperature()); - BigDecimal minT = parse(vo.getHourlyMinTemperature()); - BigDecimal wind = parse(vo.getExtremeWindSpeedHourly()); - - // 累加 - sumHourlyPrecipitation = sumHourlyPrecipitation.add(hourlyPre); - sumTemperature = sumTemperature.add(temp); - - // 最大值 - if (maxTemp == null || maxT.compareTo(maxTemp) > 0) { - maxTemp = maxT; - } - - // 最小值 - if (minTemp == null || minT.compareTo(minTemp) < 0) { - minTemp = minT; - } - - // 最大风速 - if (maxWind == null || wind.compareTo(maxWind) > 0) { - maxWind = wind; - } - } - - // 平均值(保留2位小数) - String avgHourlyPrecipitation = divide(sumHourlyPrecipitation, size); - String avgTemperature = divide(sumTemperature, size); - - // 3. 构建实体 - RegionalWeatherData data = new RegionalWeatherData(); - data.setOrgCode(orgCode); - data.setDataTime(prevHourDate); - data.setHourlyPrecipitation(avgHourlyPrecipitation); - data.setDailyPrecipitation("0"); - data.setTemperature(avgTemperature); - data.setHourlyMaxTemperature(toStr(maxTemp)); - data.setHourlyMinTemperature(toStr(minTemp)); - data.setExtremeWindSpeedHourly(toStr(maxWind)); - - result.add(data); - } - - return result; - } - - /** - * String → BigDecimal(空值安全) - */ - private static BigDecimal parse(String val) { - if (val == null || val.trim().isEmpty()) { - return BigDecimal.ZERO; + log.error("起始时间 传入的时间格式不正确,期望格式为 yyyy-MM-dd HH:mm,实际传入: {}", startDate, e); + throw new IllegalArgumentException("起始时间 时间格式不正确"); } + LocalDateTime endTime; try { - return new BigDecimal(val.trim()); + endTime = LocalDateTime.parse(endDate, DATE_TIME_FORMATTER); } catch (Exception e) { - return BigDecimal.ZERO; + log.error("终止时间 传入的时间格式不正确,期望格式为 yyyy-MM-dd HH:mm,实际传入: {}", endDate, e); + throw new IllegalArgumentException("终止时间 时间格式不正确"); } - } - - /** - * 平均值计算(保留2位小数) - */ - private static String divide(BigDecimal sum, int size) { - if (size == 0) { - return "0"; + // 安全校验:如果起始时间在终止时间之后,则无需处理 + if (startTime.isAfter(endTime)) { + log.warn("起始时间 {} 晚于终止时间 {},无需同步。", startDate, endDate); + return; } - return sum.divide(BigDecimal.valueOf(size), 2, RoundingMode.HALF_UP).toString(); - } - - /** - * BigDecimal → String - */ - private static String toStr(BigDecimal val) { - return val == null ? "0" : val.setScale(2, RoundingMode.HALF_UP).toString(); - } - - - /** - * 加载气象区划配置表 - * - **/ - private void loadWeatherAreaConfig() { - weatherAreaMap.clear(); - List list = weatherSiteAreaConfigurationMapper.selectAll(); - if (CollectionUtils.isEmpty(list)) return; - Map tempMap = list.stream() - .filter(Objects::nonNull) - .filter(config -> config.getWeatherProvince() != null - && config.getWeatherCity() != null - && config.getWeatherDistrict() != null - && config.getDistrictCode() != null) - .collect(Collectors.toMap( - config -> String.join(":", - config.getWeatherProvince(), - config.getWeatherCity(), - config.getWeatherDistrict()), - WeatherSiteAreaConfiguration::getDistrictCode, - (oldValue, newValue) -> oldValue - )); - - // 全部放入你的成员变量 Map - weatherAreaMap.putAll(tempMap); - } - - public List convertToSyncList( - List regionalList, - List nationalList) { - - Stream regionalStream = regionalList.stream() - .map(item -> buildVO( - item.getProvince(), - item.getCity(), - item.getDistrict(), - item.getDataTime(), - item.getTemperature(), - item.getHourlyMaxTemperature(), - item.getHourlyMinTemperature(), - item.getHourlyPrecipitation(), - item.getDailyPrecipitation(), - item.getExtremeWindSpeedHourly(), - weatherAreaMap - )) - .filter(Objects::nonNull); - - Stream nationalStream = nationalList.stream() - .map(item -> buildVO( - item.getProvince(), - item.getCity(), - item.getDistrict(), - item.getDataTime(), - item.getTemperature(), - item.getHourlyMaxTemperature(), - item.getHourlyMinTemperature(), - item.getHourlyPrecipitation(), - item.getDailyPrecipitation(), - item.getExtremeWindSpeedHourly(), - weatherAreaMap - )) - .filter(Objects::nonNull); - - return Stream.concat(regionalStream, nationalStream) - .collect(Collectors.toList()); - } - - private static SyncWeatherDataVO buildVO( - String province, - String city, - String district, - String dataTime, - String temperature, - String hourlyMaxTemperature, - String hourlyMinTemperature, - String hourlyPrecipitation, - String dailyPrecipitation, - String extremeWindSpeedHourly, - Map weatherAreaMap) { - - String key = buildKey(province, city, district); - String orgCode = weatherAreaMap.get(key); - - // 没匹配上区域编码,直接丢弃 - if (orgCode == null) { - return null; - } - - SyncWeatherDataVO vo = new SyncWeatherDataVO(); - vo.setOrgCode(orgCode); - vo.setDataTime(dataTime); - vo.setTemperature(temperature); - vo.setHourlyMaxTemperature(hourlyMaxTemperature); - vo.setHourlyMinTemperature(hourlyMinTemperature); - vo.setHourlyPrecipitation(hourlyPrecipitation); - vo.setDailyPrecipitation(dailyPrecipitation); - vo.setExtremeWindSpeedHourly(extremeWindSpeedHourly); - - return vo; - } - - private static String buildKey(String province, String city, String district) { - return String.join(":", - safe(province), - safe(city), - safe(district) + // 获取业务专属线程池 + ThreadPoolTaskExecutor executor = businessThreadPoolFactory.getOrCreateThreadPool( + BusinessThreadPoolFactory.BusinessCodes.REGION_WEATHER_OLD ); + // 用于保存所有的异步任务引用,以便最后等待它们全部完成 + List> futures = new ArrayList<>(); + + // 【高安全设计】使用线程安全的队列来记录处理失败的时间点,便于后续重试或告警 + ConcurrentLinkedQueue failedTimePoints = new ConcurrentLinkedQueue<>(); + + // 3. 循环生成每一个小时的任务 + LocalDateTime currentLoopTime = startTime; + + regionalWeatherDataSyncService.loadWeatherAreaConfig(); + + Semaphore semaphore = new Semaphore(10); + while (!currentLoopTime.isAfter(endTime)) { + try { + // ✅ 提交前限流(关键优化点) + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("线程被中断", e); + } + String targetTimeStr = currentLoopTime.format(DATE_TIME_FORMATTER); + CompletableFuture future = CompletableFuture.runAsync(() -> { + long start = System.currentTimeMillis(); + try { + log.info(">> 开始同步 [{}]", targetTimeStr); + // ✅ 调用事务方法 + regionalWeatherDataSyncService.syncWeatherData(targetTimeStr); + log.info("<< 完成 [{}],耗时 {} ms", + targetTimeStr, System.currentTimeMillis() - start); + } catch (Exception e) { + log.error("!! 处理失败 [{}]", targetTimeStr, e); + failedTimePoints.add(targetTimeStr); + } finally { + // ✅ 必须释放(否则死锁) + semaphore.release(); + } + }, executor); + futures.add(future); + currentLoopTime = currentLoopTime.plusHours(1); + } + + // 4. 等待所有派发的任务执行完毕 + log.info("共生成了 {} 个整时同步任务,正在等待全部执行完成...", futures.size()); + try { + // CompletableFuture.allOf 可以等待传入的所有 Future 执行完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } catch (Exception e) { + log.error("等待多线程任务执行时发生系统级异常", e); + } + + // 5. 任务结果汇总与告警处理 + if (!failedTimePoints.isEmpty()) { + log.warn("=== 历史数据同步结束,但存在失败记录!==="); + log.warn("总计失败次数: {},失败的时间点列表: {}", failedTimePoints.size(), failedTimePoints); + } else { + log.info("=== 历史气象数据同步完美收官!所有时间点均处理成功。总计处理条数:{} ===", futures.size()); + } } - private static String safe(String str) { - return str == null ? "" : str.trim(); - } + } diff --git a/src/main/java/com/southern/power/grid/service/impl/RegionalWeatherDataSyncService.java b/src/main/java/com/southern/power/grid/service/impl/RegionalWeatherDataSyncService.java new file mode 100644 index 0000000..cf17b29 --- /dev/null +++ b/src/main/java/com/southern/power/grid/service/impl/RegionalWeatherDataSyncService.java @@ -0,0 +1,272 @@ +package com.southern.power.grid.service.impl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.southern.power.grid.dao.NationalWeatherStationMapper; +import com.southern.power.grid.dao.RegionalWeatherDataMapper; +import com.southern.power.grid.dao.RegionalWeatherStationMapper; +import com.southern.power.grid.dao.WeatherSiteAreaConfigurationMapper; +import com.southern.power.grid.entity.*; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.CollectionUtils; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Service +@Slf4j +@RequiredArgsConstructor +public class RegionalWeatherDataSyncService extends ServiceImpl { + + private final RegionalWeatherStationMapper regionalWeatherStationMapper; + private final NationalWeatherStationMapper nationalWeatherStationMapper; + private final Map weatherAreaMap = new ConcurrentHashMap<>(); + private final WeatherSiteAreaConfigurationMapper weatherSiteAreaConfigurationMapper; + + + @Transactional(rollbackFor = Exception.class) + public void syncWeatherData(String dateTime) { + try { + List regionalWeatherStations = regionalWeatherStationMapper.getSyncData(dateTime); + List nationalWeatherStations = nationalWeatherStationMapper.getSyncData(dateTime); + if (CollectionUtils.isEmpty(regionalWeatherStations) && CollectionUtils.isEmpty(nationalWeatherStations)) { + log.warn("【区域天气数据同步任务】没有查询到天气数据:{}", dateTime); + return; + } + List list = convertToSyncList(regionalWeatherStations, nationalWeatherStations); + if (CollectionUtils.isEmpty(list)) { + return; + } + List dataList = convert(list, dateTime); + this.saveBatch(dataList, 700); + } catch (Exception e) { + log.error("定时同步区域气象数据失败:{}", e.getMessage(), e); + throw new RuntimeException(e); + } + } + + public List convert(List list, String prevHourDate) { + if (list == null || list.isEmpty()) { + return Collections.emptyList(); + } + + // 1. 按 orgCode 分组 + Map> groupMap = + list.stream().collect(Collectors.groupingBy(SyncWeatherDataVO::getOrgCode)); + + // 2. 分组处理 + List result = new ArrayList<>(); + + for (Map.Entry> entry : groupMap.entrySet()) { + String orgCode = entry.getKey(); + List groupList = entry.getValue(); + + int size = groupList.size(); + + BigDecimal sumHourlyPrecipitation = BigDecimal.ZERO; + BigDecimal sumTemperature = BigDecimal.ZERO; + + BigDecimal maxTemp = null; + BigDecimal minTemp = null; + BigDecimal maxWind = null; + + for (SyncWeatherDataVO vo : groupList) { + + BigDecimal hourlyPre = parse(vo.getHourlyPrecipitation()); + BigDecimal temp = parse(vo.getTemperature()); + BigDecimal maxT = parse(vo.getHourlyMaxTemperature()); + BigDecimal minT = parse(vo.getHourlyMinTemperature()); + BigDecimal wind = parse(vo.getExtremeWindSpeedHourly()); + + // 累加 + sumHourlyPrecipitation = sumHourlyPrecipitation.add(hourlyPre); + sumTemperature = sumTemperature.add(temp); + + // 最大值 + if (maxTemp == null || maxT.compareTo(maxTemp) > 0) { + maxTemp = maxT; + } + + // 最小值 + if (minTemp == null || minT.compareTo(minTemp) < 0) { + minTemp = minT; + } + + // 最大风速 + if (maxWind == null || wind.compareTo(maxWind) > 0) { + maxWind = wind; + } + } + + // 平均值(保留2位小数) + String avgHourlyPrecipitation = divide(sumHourlyPrecipitation, size); + String avgTemperature = divide(sumTemperature, size); + + // 3. 构建实体 + RegionalWeatherData data = new RegionalWeatherData(); + data.setOrgCode(orgCode); + data.setDataTime(prevHourDate); + data.setHourlyPrecipitation(avgHourlyPrecipitation); + data.setDailyPrecipitation("0"); + data.setTemperature(avgTemperature); + data.setHourlyMaxTemperature(toStr(maxTemp)); + data.setHourlyMinTemperature(toStr(minTemp)); + data.setExtremeWindSpeedHourly(toStr(maxWind)); + + result.add(data); + } + + return result; + } + + /** + * String → BigDecimal(空值安全) + */ + private static BigDecimal parse(String val) { + if (val == null || val.trim().isEmpty()) { + return BigDecimal.ZERO; + } + try { + return new BigDecimal(val.trim()); + } catch (Exception e) { + return BigDecimal.ZERO; + } + } + + /** + * 平均值计算(保留2位小数) + */ + private static String divide(BigDecimal sum, int size) { + if (size == 0) { + return "0"; + } + return sum.divide(BigDecimal.valueOf(size), 2, RoundingMode.HALF_UP).toString(); + } + + /** + * BigDecimal → String + */ + private static String toStr(BigDecimal val) { + return val == null ? "0" : val.setScale(2, RoundingMode.HALF_UP).toString(); + } + + + /** + * 加载气象区划配置表 + * + **/ + public void loadWeatherAreaConfig() { + weatherAreaMap.clear(); + List list = weatherSiteAreaConfigurationMapper.selectAll(); + if (CollectionUtils.isEmpty(list)) return; + Map tempMap = list.stream() + .filter(Objects::nonNull) + .filter(config -> config.getWeatherProvince() != null + && config.getWeatherCity() != null + && config.getWeatherDistrict() != null + && config.getDistrictCode() != null) + .collect(Collectors.toMap( + config -> String.join(":", + config.getWeatherProvince(), + config.getWeatherCity(), + config.getWeatherDistrict()), + WeatherSiteAreaConfiguration::getDistrictCode, + (oldValue, newValue) -> oldValue + )); + + // 全部放入你的成员变量 Map + weatherAreaMap.putAll(tempMap); + } + + public List convertToSyncList( + List regionalList, + List nationalList) { + + Stream regionalStream = regionalList.stream() + .map(item -> buildVO( + item.getProvince(), + item.getCity(), + item.getDistrict(), + item.getDataTime(), + item.getTemperature(), + item.getHourlyMaxTemperature(), + item.getHourlyMinTemperature(), + item.getHourlyPrecipitation(), + item.getDailyPrecipitation(), + item.getExtremeWindSpeedHourly(), + weatherAreaMap + )) + .filter(Objects::nonNull); + + Stream nationalStream = nationalList.stream() + .map(item -> buildVO( + item.getProvince(), + item.getCity(), + item.getDistrict(), + item.getDataTime(), + item.getTemperature(), + item.getHourlyMaxTemperature(), + item.getHourlyMinTemperature(), + item.getHourlyPrecipitation(), + item.getDailyPrecipitation(), + item.getExtremeWindSpeedHourly(), + weatherAreaMap + )) + .filter(Objects::nonNull); + + return Stream.concat(regionalStream, nationalStream) + .collect(Collectors.toList()); + } + + private static SyncWeatherDataVO buildVO( + String province, + String city, + String district, + String dataTime, + String temperature, + String hourlyMaxTemperature, + String hourlyMinTemperature, + String hourlyPrecipitation, + String dailyPrecipitation, + String extremeWindSpeedHourly, + Map weatherAreaMap) { + + String key = buildKey(province, city, district); + String orgCode = weatherAreaMap.get(key); + + // 没匹配上区域编码,直接丢弃 + if (orgCode == null) { + return null; + } + + SyncWeatherDataVO vo = new SyncWeatherDataVO(); + vo.setOrgCode(orgCode); + vo.setDataTime(dataTime); + vo.setTemperature(temperature); + vo.setHourlyMaxTemperature(hourlyMaxTemperature); + vo.setHourlyMinTemperature(hourlyMinTemperature); + vo.setHourlyPrecipitation(hourlyPrecipitation); + vo.setDailyPrecipitation(dailyPrecipitation); + vo.setExtremeWindSpeedHourly(extremeWindSpeedHourly); + + return vo; + } + + private static String buildKey(String province, String city, String district) { + return String.join(":", + safe(province), + safe(city), + safe(district) + ); + } + + private static String safe(String str) { + return str == null ? "" : str.trim(); + } +} diff --git a/src/main/java/com/southern/power/grid/task/WeatherDataScheduleTask.java b/src/main/java/com/southern/power/grid/task/WeatherDataScheduleTask.java index debb879..bd0b0a2 100644 --- a/src/main/java/com/southern/power/grid/task/WeatherDataScheduleTask.java +++ b/src/main/java/com/southern/power/grid/task/WeatherDataScheduleTask.java @@ -1,11 +1,14 @@ package com.southern.power.grid.task; +import com.southern.power.grid.service.IDnerDailyPowerOutageEventSyncService; import com.southern.power.grid.service.IRegionalWeatherDataService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @@ -15,15 +18,12 @@ public class WeatherDataScheduleTask { @Resource private IRegionalWeatherDataService regionalWeatherDataService; + @Resource + private IDnerDailyPowerOutageEventSyncService service; + // 防止并发执行 private final AtomicBoolean running = new AtomicBoolean(false); -/* @PostConstruct - public void init(){ - log.info("【区域天气数据同步任务】项目启动触发"); - execute(); - }*/ - /** * 每小时第20分钟执行 */ @@ -43,7 +43,9 @@ public class WeatherDataScheduleTask { } try { - regionalWeatherDataService.scheduleSyncWeatherData(); + //获取当前时间前一个整点时间 + String prevHourDate = LocalDateTime.now().minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00")); + regionalWeatherDataService.scheduleSyncWeatherData(prevHourDate); log.info("【区域天气数据同步任务】定时触发--结束"); } catch (Exception e) { log.error("【区域天气数据同步任务】执行异常", e);