提交接口-同步区域气象数据根据起始和终止时间
This commit is contained in:
parent
64b8c996b3
commit
8ab7eeb801
40
.gitignore
vendored
Normal file
40
.gitignore
vendored
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
|
||||||
|
!.mvn/wrapper/maven-wrapper.jar
|
||||||
|
**/log/
|
||||||
|
**/logs/
|
||||||
|
**/target/
|
||||||
|
**/**/log/
|
||||||
|
**/**/logs/
|
||||||
|
**/**/target/
|
||||||
|
.fastRequest
|
||||||
|
.qoder
|
||||||
|
### IntelliJ IDEA ###
|
||||||
|
.idea
|
||||||
|
*.iws
|
||||||
|
*.iml
|
||||||
|
*.ipr
|
||||||
|
|
||||||
|
### Eclipse ###
|
||||||
|
.apt_generated
|
||||||
|
.classpath
|
||||||
|
.factorypath
|
||||||
|
.project
|
||||||
|
.settings
|
||||||
|
.springBeans
|
||||||
|
.sts4-cache
|
||||||
|
|
||||||
|
### NetBeans ###
|
||||||
|
/nbproject/private/
|
||||||
|
/nbbuild/
|
||||||
|
/dist/
|
||||||
|
/nbdist/
|
||||||
|
/.nb-gradle/
|
||||||
|
build/
|
||||||
|
!**/src/main/**/build/
|
||||||
|
!**/src/test/**/build/
|
||||||
|
|
||||||
|
### VS Code ###
|
||||||
|
.vscode/
|
||||||
|
|
||||||
|
### Mac OS ###
|
||||||
|
.DS_Store
|
||||||
@ -26,6 +26,7 @@ public class BusinessThreadPoolFactory {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取或创建业务线程池(线程安全)
|
* 获取或创建业务线程池(线程安全)
|
||||||
|
*
|
||||||
* @param businessCode 业务代码,用于标识业务类型
|
* @param businessCode 业务代码,用于标识业务类型
|
||||||
* @return 线程池
|
* @return 线程池
|
||||||
*/
|
*/
|
||||||
@ -73,6 +74,8 @@ public class BusinessThreadPoolFactory {
|
|||||||
switch (businessCode) {
|
switch (businessCode) {
|
||||||
case "DAILY_POWER":
|
case "DAILY_POWER":
|
||||||
return "Dner-Daily-Power-Async";
|
return "Dner-Daily-Power-Async";
|
||||||
|
case "REGION_WEATHER_OLD":
|
||||||
|
return "Region-Weather-Old-Async";
|
||||||
default:
|
default:
|
||||||
return "Biz-" + businessCode + "-Async";
|
return "Biz-" + businessCode + "-Async";
|
||||||
}
|
}
|
||||||
@ -135,6 +138,7 @@ public class BusinessThreadPoolFactory {
|
|||||||
* 业务代码常量
|
* 业务代码常量
|
||||||
*/
|
*/
|
||||||
public static class BusinessCodes {
|
public static class BusinessCodes {
|
||||||
public static final String DAILY_POWER = "DAILY_POWER";
|
public static final String DAILY_POWER = "DAILY_POWER";//K值同步
|
||||||
|
public static final String REGION_WEATHER_OLD = "REGION_WEATHER_OLD"; //区域气象历史数据同步
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,10 +2,7 @@ package com.southern.power.grid.controller;
|
|||||||
|
|
||||||
import com.southern.power.grid.common.Result;
|
import com.southern.power.grid.common.Result;
|
||||||
import com.southern.power.grid.entity.*;
|
import com.southern.power.grid.entity.*;
|
||||||
import com.southern.power.grid.service.IDnerDailyPowerOutageEventService;
|
import com.southern.power.grid.service.*;
|
||||||
import com.southern.power.grid.service.IDnerHourlyPowerOutageEventService;
|
|
||||||
import com.southern.power.grid.service.IDnerSiteAreaConfigurationService;
|
|
||||||
import com.southern.power.grid.service.IImportTaskService;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
@ -36,6 +33,9 @@ public class DnerController {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IDnerSiteAreaConfigurationService dnerSiteAreaConfigurationService;
|
private IDnerSiteAreaConfigurationService dnerSiteAreaConfigurationService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IRegionalWeatherDataService regionalWeatherDataService;
|
||||||
|
|
||||||
@PostMapping("/area-tree/query")
|
@PostMapping("/area-tree/query")
|
||||||
public Result<List<AreaTreeVO>> queryAreaTree(@RequestBody @Valid AreaTreeReq req) {
|
public Result<List<AreaTreeVO>> queryAreaTree(@RequestBody @Valid AreaTreeReq req) {
|
||||||
return Result.success(dnerSiteAreaConfigurationService.queryAreaTree(req));
|
return Result.success(dnerSiteAreaConfigurationService.queryAreaTree(req));
|
||||||
@ -90,4 +90,17 @@ public class DnerController {
|
|||||||
ImportTask task = importTaskService.getProgress(taskNo);
|
ImportTask task = importTaskService.getProgress(taskNo);
|
||||||
return Result.success(task);
|
return Result.success(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 同步区域气象数据
|
||||||
|
*
|
||||||
|
* @param startDate 开始日期(数据示例:2025-09-01 00:00:00)
|
||||||
|
* @param endDate 结束日期(数据示例:2025-09-01 00:00:00)
|
||||||
|
* @return 结果 <字符串>
|
||||||
|
*/
|
||||||
|
@GetMapping("/sync/regionWeatherData")
|
||||||
|
public Result<String> syncRegionWeatherData(@RequestParam String startDate, @RequestParam String endDate) {
|
||||||
|
regionalWeatherDataService.syncOldWeatherData(startDate, endDate);
|
||||||
|
return Result.success("success");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -6,5 +6,14 @@ public interface IRegionalWeatherDataService {
|
|||||||
* 定时同步天气数据
|
* 定时同步天气数据
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
void scheduleSyncWeatherData();
|
void scheduleSyncWeatherData(String prevHourDate);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 同步旧天气数据
|
||||||
|
*
|
||||||
|
* @param startDate 开始日期(yyyy-mm-dd hh:mm)整时时间
|
||||||
|
* @param endDate 结束日期(yyyy-mm-dd hh:mm)整时时间
|
||||||
|
*/
|
||||||
|
void syncOldWeatherData(String startDate, String endDate);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,25 +1,23 @@
|
|||||||
package com.southern.power.grid.service.impl;
|
package com.southern.power.grid.service.impl;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
import com.southern.power.grid.dao.NationalWeatherStationMapper;
|
import com.southern.power.grid.config.BusinessThreadPoolFactory;
|
||||||
import com.southern.power.grid.dao.RegionalWeatherDataMapper;
|
import com.southern.power.grid.dao.RegionalWeatherDataMapper;
|
||||||
import com.southern.power.grid.dao.RegionalWeatherStationMapper;
|
import com.southern.power.grid.entity.RegionalWeatherData;
|
||||||
import com.southern.power.grid.dao.WeatherSiteAreaConfigurationMapper;
|
|
||||||
import com.southern.power.grid.entity.*;
|
|
||||||
import com.southern.power.grid.service.IRegionalWeatherDataService;
|
import com.southern.power.grid.service.IRegionalWeatherDataService;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
|
||||||
import org.springframework.util.CollectionUtils;
|
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
|
||||||
import java.math.RoundingMode;
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
import java.util.stream.Collectors;
|
import java.util.List;
|
||||||
import java.util.stream.Stream;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -27,253 +25,105 @@ import java.util.stream.Stream;
|
|||||||
public class RegionalWeatherDataServiceImpl extends ServiceImpl<RegionalWeatherDataMapper, RegionalWeatherData>
|
public class RegionalWeatherDataServiceImpl extends ServiceImpl<RegionalWeatherDataMapper, RegionalWeatherData>
|
||||||
implements IRegionalWeatherDataService {
|
implements IRegionalWeatherDataService {
|
||||||
|
|
||||||
private final RegionalWeatherStationMapper regionalWeatherStationMapper;
|
|
||||||
private final NationalWeatherStationMapper nationalWeatherStationMapper;
|
private final BusinessThreadPoolFactory businessThreadPoolFactory;
|
||||||
private final WeatherSiteAreaConfigurationMapper weatherSiteAreaConfigurationMapper;
|
private final RegionalWeatherDataSyncService regionalWeatherDataSyncService;
|
||||||
private final Map<String, String> weatherAreaMap = new HashMap<>();
|
|
||||||
|
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00");
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional(rollbackFor = Exception.class)
|
public void scheduleSyncWeatherData(String dateTime) {
|
||||||
public void scheduleSyncWeatherData() {
|
regionalWeatherDataSyncService.loadWeatherAreaConfig();
|
||||||
|
regionalWeatherDataSyncService.syncWeatherData(dateTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Async
|
||||||
|
public void syncOldWeatherData(String startDate, String endDate) {
|
||||||
|
log.info("开始执行历史气象数据同步任务,起始时间: {},终止时间:{}", startDate, endDate);
|
||||||
|
LocalDateTime startTime;
|
||||||
try {
|
try {
|
||||||
//获取当前时间前一个整点时间
|
startTime = LocalDateTime.parse(startDate, DATE_TIME_FORMATTER);
|
||||||
String prevHourDate = LocalDateTime.now().minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
|
|
||||||
List<RegionalWeatherStation> regionalWeatherStations = regionalWeatherStationMapper.getSyncData(prevHourDate);
|
|
||||||
List<NationalWeatherStation> nationalWeatherStations = nationalWeatherStationMapper.getSyncData(prevHourDate);
|
|
||||||
if (CollectionUtils.isEmpty(regionalWeatherStations) && CollectionUtils.isEmpty(nationalWeatherStations)) {
|
|
||||||
log.warn("【区域天气数据同步任务】没有查询到天气数据:{}", prevHourDate);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
loadWeatherAreaConfig();
|
|
||||||
List<SyncWeatherDataVO> list = convertToSyncList(regionalWeatherStations, nationalWeatherStations);
|
|
||||||
if (CollectionUtils.isEmpty(list)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<RegionalWeatherData> dataList = convert(list, prevHourDate);
|
|
||||||
this.saveBatch(dataList, 700);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("定时同步区域气象数据失败:{}", e.getMessage(), e);
|
log.error("起始时间 传入的时间格式不正确,期望格式为 yyyy-MM-dd HH:mm,实际传入: {}", startDate, e);
|
||||||
throw new RuntimeException(e);
|
throw new IllegalArgumentException("起始时间 时间格式不正确");
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<RegionalWeatherData> convert(List<SyncWeatherDataVO> list, String prevHourDate) {
|
|
||||||
if (list == null || list.isEmpty()) {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. 按 orgCode 分组
|
|
||||||
Map<String, List<SyncWeatherDataVO>> groupMap =
|
|
||||||
list.stream().collect(Collectors.groupingBy(SyncWeatherDataVO::getOrgCode));
|
|
||||||
|
|
||||||
// 2. 分组处理
|
|
||||||
List<RegionalWeatherData> result = new ArrayList<>();
|
|
||||||
|
|
||||||
for (Map.Entry<String, List<SyncWeatherDataVO>> entry : groupMap.entrySet()) {
|
|
||||||
String orgCode = entry.getKey();
|
|
||||||
List<SyncWeatherDataVO> groupList = entry.getValue();
|
|
||||||
|
|
||||||
int size = groupList.size();
|
|
||||||
|
|
||||||
BigDecimal sumHourlyPrecipitation = BigDecimal.ZERO;
|
|
||||||
BigDecimal sumTemperature = BigDecimal.ZERO;
|
|
||||||
|
|
||||||
BigDecimal maxTemp = null;
|
|
||||||
BigDecimal minTemp = null;
|
|
||||||
BigDecimal maxWind = null;
|
|
||||||
|
|
||||||
for (SyncWeatherDataVO vo : groupList) {
|
|
||||||
|
|
||||||
BigDecimal hourlyPre = parse(vo.getHourlyPrecipitation());
|
|
||||||
BigDecimal temp = parse(vo.getTemperature());
|
|
||||||
BigDecimal maxT = parse(vo.getHourlyMaxTemperature());
|
|
||||||
BigDecimal minT = parse(vo.getHourlyMinTemperature());
|
|
||||||
BigDecimal wind = parse(vo.getExtremeWindSpeedHourly());
|
|
||||||
|
|
||||||
// 累加
|
|
||||||
sumHourlyPrecipitation = sumHourlyPrecipitation.add(hourlyPre);
|
|
||||||
sumTemperature = sumTemperature.add(temp);
|
|
||||||
|
|
||||||
// 最大值
|
|
||||||
if (maxTemp == null || maxT.compareTo(maxTemp) > 0) {
|
|
||||||
maxTemp = maxT;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 最小值
|
|
||||||
if (minTemp == null || minT.compareTo(minTemp) < 0) {
|
|
||||||
minTemp = minT;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 最大风速
|
|
||||||
if (maxWind == null || wind.compareTo(maxWind) > 0) {
|
|
||||||
maxWind = wind;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 平均值(保留2位小数)
|
|
||||||
String avgHourlyPrecipitation = divide(sumHourlyPrecipitation, size);
|
|
||||||
String avgTemperature = divide(sumTemperature, size);
|
|
||||||
|
|
||||||
// 3. 构建实体
|
|
||||||
RegionalWeatherData data = new RegionalWeatherData();
|
|
||||||
data.setOrgCode(orgCode);
|
|
||||||
data.setDataTime(prevHourDate);
|
|
||||||
data.setHourlyPrecipitation(avgHourlyPrecipitation);
|
|
||||||
data.setDailyPrecipitation("0");
|
|
||||||
data.setTemperature(avgTemperature);
|
|
||||||
data.setHourlyMaxTemperature(toStr(maxTemp));
|
|
||||||
data.setHourlyMinTemperature(toStr(minTemp));
|
|
||||||
data.setExtremeWindSpeedHourly(toStr(maxWind));
|
|
||||||
|
|
||||||
result.add(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* String → BigDecimal(空值安全)
|
|
||||||
*/
|
|
||||||
private static BigDecimal parse(String val) {
|
|
||||||
if (val == null || val.trim().isEmpty()) {
|
|
||||||
return BigDecimal.ZERO;
|
|
||||||
}
|
}
|
||||||
|
LocalDateTime endTime;
|
||||||
try {
|
try {
|
||||||
return new BigDecimal(val.trim());
|
endTime = LocalDateTime.parse(endDate, DATE_TIME_FORMATTER);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return BigDecimal.ZERO;
|
log.error("终止时间 传入的时间格式不正确,期望格式为 yyyy-MM-dd HH:mm,实际传入: {}", endDate, e);
|
||||||
|
throw new IllegalArgumentException("终止时间 时间格式不正确");
|
||||||
}
|
}
|
||||||
}
|
// 安全校验:如果起始时间在终止时间之后,则无需处理
|
||||||
|
if (startTime.isAfter(endTime)) {
|
||||||
/**
|
log.warn("起始时间 {} 晚于终止时间 {},无需同步。", startDate, endDate);
|
||||||
* 平均值计算(保留2位小数)
|
return;
|
||||||
*/
|
|
||||||
private static String divide(BigDecimal sum, int size) {
|
|
||||||
if (size == 0) {
|
|
||||||
return "0";
|
|
||||||
}
|
}
|
||||||
return sum.divide(BigDecimal.valueOf(size), 2, RoundingMode.HALF_UP).toString();
|
// 获取业务专属线程池
|
||||||
}
|
ThreadPoolTaskExecutor executor = businessThreadPoolFactory.getOrCreateThreadPool(
|
||||||
|
BusinessThreadPoolFactory.BusinessCodes.REGION_WEATHER_OLD
|
||||||
/**
|
|
||||||
* BigDecimal → String
|
|
||||||
*/
|
|
||||||
private static String toStr(BigDecimal val) {
|
|
||||||
return val == null ? "0" : val.setScale(2, RoundingMode.HALF_UP).toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 加载气象区划配置表
|
|
||||||
*
|
|
||||||
**/
|
|
||||||
private void loadWeatherAreaConfig() {
|
|
||||||
weatherAreaMap.clear();
|
|
||||||
List<WeatherSiteAreaConfiguration> list = weatherSiteAreaConfigurationMapper.selectAll();
|
|
||||||
if (CollectionUtils.isEmpty(list)) return;
|
|
||||||
Map<String, String> tempMap = list.stream()
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.filter(config -> config.getWeatherProvince() != null
|
|
||||||
&& config.getWeatherCity() != null
|
|
||||||
&& config.getWeatherDistrict() != null
|
|
||||||
&& config.getDistrictCode() != null)
|
|
||||||
.collect(Collectors.toMap(
|
|
||||||
config -> String.join(":",
|
|
||||||
config.getWeatherProvince(),
|
|
||||||
config.getWeatherCity(),
|
|
||||||
config.getWeatherDistrict()),
|
|
||||||
WeatherSiteAreaConfiguration::getDistrictCode,
|
|
||||||
(oldValue, newValue) -> oldValue
|
|
||||||
));
|
|
||||||
|
|
||||||
// 全部放入你的成员变量 Map
|
|
||||||
weatherAreaMap.putAll(tempMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<SyncWeatherDataVO> convertToSyncList(
|
|
||||||
List<RegionalWeatherStation> regionalList,
|
|
||||||
List<NationalWeatherStation> nationalList) {
|
|
||||||
|
|
||||||
Stream<SyncWeatherDataVO> regionalStream = regionalList.stream()
|
|
||||||
.map(item -> buildVO(
|
|
||||||
item.getProvince(),
|
|
||||||
item.getCity(),
|
|
||||||
item.getDistrict(),
|
|
||||||
item.getDataTime(),
|
|
||||||
item.getTemperature(),
|
|
||||||
item.getHourlyMaxTemperature(),
|
|
||||||
item.getHourlyMinTemperature(),
|
|
||||||
item.getHourlyPrecipitation(),
|
|
||||||
item.getDailyPrecipitation(),
|
|
||||||
item.getExtremeWindSpeedHourly(),
|
|
||||||
weatherAreaMap
|
|
||||||
))
|
|
||||||
.filter(Objects::nonNull);
|
|
||||||
|
|
||||||
Stream<SyncWeatherDataVO> nationalStream = nationalList.stream()
|
|
||||||
.map(item -> buildVO(
|
|
||||||
item.getProvince(),
|
|
||||||
item.getCity(),
|
|
||||||
item.getDistrict(),
|
|
||||||
item.getDataTime(),
|
|
||||||
item.getTemperature(),
|
|
||||||
item.getHourlyMaxTemperature(),
|
|
||||||
item.getHourlyMinTemperature(),
|
|
||||||
item.getHourlyPrecipitation(),
|
|
||||||
item.getDailyPrecipitation(),
|
|
||||||
item.getExtremeWindSpeedHourly(),
|
|
||||||
weatherAreaMap
|
|
||||||
))
|
|
||||||
.filter(Objects::nonNull);
|
|
||||||
|
|
||||||
return Stream.concat(regionalStream, nationalStream)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SyncWeatherDataVO buildVO(
|
|
||||||
String province,
|
|
||||||
String city,
|
|
||||||
String district,
|
|
||||||
String dataTime,
|
|
||||||
String temperature,
|
|
||||||
String hourlyMaxTemperature,
|
|
||||||
String hourlyMinTemperature,
|
|
||||||
String hourlyPrecipitation,
|
|
||||||
String dailyPrecipitation,
|
|
||||||
String extremeWindSpeedHourly,
|
|
||||||
Map<String, String> weatherAreaMap) {
|
|
||||||
|
|
||||||
String key = buildKey(province, city, district);
|
|
||||||
String orgCode = weatherAreaMap.get(key);
|
|
||||||
|
|
||||||
// 没匹配上区域编码,直接丢弃
|
|
||||||
if (orgCode == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncWeatherDataVO vo = new SyncWeatherDataVO();
|
|
||||||
vo.setOrgCode(orgCode);
|
|
||||||
vo.setDataTime(dataTime);
|
|
||||||
vo.setTemperature(temperature);
|
|
||||||
vo.setHourlyMaxTemperature(hourlyMaxTemperature);
|
|
||||||
vo.setHourlyMinTemperature(hourlyMinTemperature);
|
|
||||||
vo.setHourlyPrecipitation(hourlyPrecipitation);
|
|
||||||
vo.setDailyPrecipitation(dailyPrecipitation);
|
|
||||||
vo.setExtremeWindSpeedHourly(extremeWindSpeedHourly);
|
|
||||||
|
|
||||||
return vo;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String buildKey(String province, String city, String district) {
|
|
||||||
return String.join(":",
|
|
||||||
safe(province),
|
|
||||||
safe(city),
|
|
||||||
safe(district)
|
|
||||||
);
|
);
|
||||||
|
// 用于保存所有的异步任务引用,以便最后等待它们全部完成
|
||||||
|
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||||
|
|
||||||
|
// 【高安全设计】使用线程安全的队列来记录处理失败的时间点,便于后续重试或告警
|
||||||
|
ConcurrentLinkedQueue<String> failedTimePoints = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
|
// 3. 循环生成每一个小时的任务
|
||||||
|
LocalDateTime currentLoopTime = startTime;
|
||||||
|
|
||||||
|
regionalWeatherDataSyncService.loadWeatherAreaConfig();
|
||||||
|
|
||||||
|
Semaphore semaphore = new Semaphore(10);
|
||||||
|
while (!currentLoopTime.isAfter(endTime)) {
|
||||||
|
try {
|
||||||
|
// ✅ 提交前限流(关键优化点)
|
||||||
|
semaphore.acquire();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException("线程被中断", e);
|
||||||
|
}
|
||||||
|
String targetTimeStr = currentLoopTime.format(DATE_TIME_FORMATTER);
|
||||||
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
||||||
|
long start = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
log.info(">> 开始同步 [{}]", targetTimeStr);
|
||||||
|
// ✅ 调用事务方法
|
||||||
|
regionalWeatherDataSyncService.syncWeatherData(targetTimeStr);
|
||||||
|
log.info("<< 完成 [{}],耗时 {} ms",
|
||||||
|
targetTimeStr, System.currentTimeMillis() - start);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("!! 处理失败 [{}]", targetTimeStr, e);
|
||||||
|
failedTimePoints.add(targetTimeStr);
|
||||||
|
} finally {
|
||||||
|
// ✅ 必须释放(否则死锁)
|
||||||
|
semaphore.release();
|
||||||
|
}
|
||||||
|
}, executor);
|
||||||
|
futures.add(future);
|
||||||
|
currentLoopTime = currentLoopTime.plusHours(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 等待所有派发的任务执行完毕
|
||||||
|
log.info("共生成了 {} 个整时同步任务,正在等待全部执行完成...", futures.size());
|
||||||
|
try {
|
||||||
|
// CompletableFuture.allOf 可以等待传入的所有 Future 执行完成
|
||||||
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("等待多线程任务执行时发生系统级异常", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. 任务结果汇总与告警处理
|
||||||
|
if (!failedTimePoints.isEmpty()) {
|
||||||
|
log.warn("=== 历史数据同步结束,但存在失败记录!===");
|
||||||
|
log.warn("总计失败次数: {},失败的时间点列表: {}", failedTimePoints.size(), failedTimePoints);
|
||||||
|
} else {
|
||||||
|
log.info("=== 历史气象数据同步完美收官!所有时间点均处理成功。总计处理条数:{} ===", futures.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String safe(String str) {
|
|
||||||
return str == null ? "" : str.trim();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,272 @@
|
|||||||
|
package com.southern.power.grid.service.impl;
|
||||||
|
|
||||||
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
import com.southern.power.grid.dao.NationalWeatherStationMapper;
|
||||||
|
import com.southern.power.grid.dao.RegionalWeatherDataMapper;
|
||||||
|
import com.southern.power.grid.dao.RegionalWeatherStationMapper;
|
||||||
|
import com.southern.power.grid.dao.WeatherSiteAreaConfigurationMapper;
|
||||||
|
import com.southern.power.grid.entity.*;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.math.RoundingMode;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class RegionalWeatherDataSyncService extends ServiceImpl<RegionalWeatherDataMapper, RegionalWeatherData> {
|
||||||
|
|
||||||
|
private final RegionalWeatherStationMapper regionalWeatherStationMapper;
|
||||||
|
private final NationalWeatherStationMapper nationalWeatherStationMapper;
|
||||||
|
private final Map<String, String> weatherAreaMap = new ConcurrentHashMap<>();
|
||||||
|
private final WeatherSiteAreaConfigurationMapper weatherSiteAreaConfigurationMapper;
|
||||||
|
|
||||||
|
|
||||||
|
@Transactional(rollbackFor = Exception.class)
|
||||||
|
public void syncWeatherData(String dateTime) {
|
||||||
|
try {
|
||||||
|
List<RegionalWeatherStation> regionalWeatherStations = regionalWeatherStationMapper.getSyncData(dateTime);
|
||||||
|
List<NationalWeatherStation> nationalWeatherStations = nationalWeatherStationMapper.getSyncData(dateTime);
|
||||||
|
if (CollectionUtils.isEmpty(regionalWeatherStations) && CollectionUtils.isEmpty(nationalWeatherStations)) {
|
||||||
|
log.warn("【区域天气数据同步任务】没有查询到天气数据:{}", dateTime);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<SyncWeatherDataVO> list = convertToSyncList(regionalWeatherStations, nationalWeatherStations);
|
||||||
|
if (CollectionUtils.isEmpty(list)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<RegionalWeatherData> dataList = convert(list, dateTime);
|
||||||
|
this.saveBatch(dataList, 700);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("定时同步区域气象数据失败:{}", e.getMessage(), e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<RegionalWeatherData> convert(List<SyncWeatherDataVO> list, String prevHourDate) {
|
||||||
|
if (list == null || list.isEmpty()) {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. 按 orgCode 分组
|
||||||
|
Map<String, List<SyncWeatherDataVO>> groupMap =
|
||||||
|
list.stream().collect(Collectors.groupingBy(SyncWeatherDataVO::getOrgCode));
|
||||||
|
|
||||||
|
// 2. 分组处理
|
||||||
|
List<RegionalWeatherData> result = new ArrayList<>();
|
||||||
|
|
||||||
|
for (Map.Entry<String, List<SyncWeatherDataVO>> entry : groupMap.entrySet()) {
|
||||||
|
String orgCode = entry.getKey();
|
||||||
|
List<SyncWeatherDataVO> groupList = entry.getValue();
|
||||||
|
|
||||||
|
int size = groupList.size();
|
||||||
|
|
||||||
|
BigDecimal sumHourlyPrecipitation = BigDecimal.ZERO;
|
||||||
|
BigDecimal sumTemperature = BigDecimal.ZERO;
|
||||||
|
|
||||||
|
BigDecimal maxTemp = null;
|
||||||
|
BigDecimal minTemp = null;
|
||||||
|
BigDecimal maxWind = null;
|
||||||
|
|
||||||
|
for (SyncWeatherDataVO vo : groupList) {
|
||||||
|
|
||||||
|
BigDecimal hourlyPre = parse(vo.getHourlyPrecipitation());
|
||||||
|
BigDecimal temp = parse(vo.getTemperature());
|
||||||
|
BigDecimal maxT = parse(vo.getHourlyMaxTemperature());
|
||||||
|
BigDecimal minT = parse(vo.getHourlyMinTemperature());
|
||||||
|
BigDecimal wind = parse(vo.getExtremeWindSpeedHourly());
|
||||||
|
|
||||||
|
// 累加
|
||||||
|
sumHourlyPrecipitation = sumHourlyPrecipitation.add(hourlyPre);
|
||||||
|
sumTemperature = sumTemperature.add(temp);
|
||||||
|
|
||||||
|
// 最大值
|
||||||
|
if (maxTemp == null || maxT.compareTo(maxTemp) > 0) {
|
||||||
|
maxTemp = maxT;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 最小值
|
||||||
|
if (minTemp == null || minT.compareTo(minTemp) < 0) {
|
||||||
|
minTemp = minT;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 最大风速
|
||||||
|
if (maxWind == null || wind.compareTo(maxWind) > 0) {
|
||||||
|
maxWind = wind;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 平均值(保留2位小数)
|
||||||
|
String avgHourlyPrecipitation = divide(sumHourlyPrecipitation, size);
|
||||||
|
String avgTemperature = divide(sumTemperature, size);
|
||||||
|
|
||||||
|
// 3. 构建实体
|
||||||
|
RegionalWeatherData data = new RegionalWeatherData();
|
||||||
|
data.setOrgCode(orgCode);
|
||||||
|
data.setDataTime(prevHourDate);
|
||||||
|
data.setHourlyPrecipitation(avgHourlyPrecipitation);
|
||||||
|
data.setDailyPrecipitation("0");
|
||||||
|
data.setTemperature(avgTemperature);
|
||||||
|
data.setHourlyMaxTemperature(toStr(maxTemp));
|
||||||
|
data.setHourlyMinTemperature(toStr(minTemp));
|
||||||
|
data.setExtremeWindSpeedHourly(toStr(maxWind));
|
||||||
|
|
||||||
|
result.add(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* String → BigDecimal(空值安全)
|
||||||
|
*/
|
||||||
|
private static BigDecimal parse(String val) {
|
||||||
|
if (val == null || val.trim().isEmpty()) {
|
||||||
|
return BigDecimal.ZERO;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return new BigDecimal(val.trim());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return BigDecimal.ZERO;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 平均值计算(保留2位小数)
|
||||||
|
*/
|
||||||
|
private static String divide(BigDecimal sum, int size) {
|
||||||
|
if (size == 0) {
|
||||||
|
return "0";
|
||||||
|
}
|
||||||
|
return sum.divide(BigDecimal.valueOf(size), 2, RoundingMode.HALF_UP).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BigDecimal → String
|
||||||
|
*/
|
||||||
|
private static String toStr(BigDecimal val) {
|
||||||
|
return val == null ? "0" : val.setScale(2, RoundingMode.HALF_UP).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 加载气象区划配置表
|
||||||
|
*
|
||||||
|
**/
|
||||||
|
public void loadWeatherAreaConfig() {
|
||||||
|
weatherAreaMap.clear();
|
||||||
|
List<WeatherSiteAreaConfiguration> list = weatherSiteAreaConfigurationMapper.selectAll();
|
||||||
|
if (CollectionUtils.isEmpty(list)) return;
|
||||||
|
Map<String, String> tempMap = list.stream()
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.filter(config -> config.getWeatherProvince() != null
|
||||||
|
&& config.getWeatherCity() != null
|
||||||
|
&& config.getWeatherDistrict() != null
|
||||||
|
&& config.getDistrictCode() != null)
|
||||||
|
.collect(Collectors.toMap(
|
||||||
|
config -> String.join(":",
|
||||||
|
config.getWeatherProvince(),
|
||||||
|
config.getWeatherCity(),
|
||||||
|
config.getWeatherDistrict()),
|
||||||
|
WeatherSiteAreaConfiguration::getDistrictCode,
|
||||||
|
(oldValue, newValue) -> oldValue
|
||||||
|
));
|
||||||
|
|
||||||
|
// 全部放入你的成员变量 Map
|
||||||
|
weatherAreaMap.putAll(tempMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<SyncWeatherDataVO> convertToSyncList(
|
||||||
|
List<RegionalWeatherStation> regionalList,
|
||||||
|
List<NationalWeatherStation> nationalList) {
|
||||||
|
|
||||||
|
Stream<SyncWeatherDataVO> regionalStream = regionalList.stream()
|
||||||
|
.map(item -> buildVO(
|
||||||
|
item.getProvince(),
|
||||||
|
item.getCity(),
|
||||||
|
item.getDistrict(),
|
||||||
|
item.getDataTime(),
|
||||||
|
item.getTemperature(),
|
||||||
|
item.getHourlyMaxTemperature(),
|
||||||
|
item.getHourlyMinTemperature(),
|
||||||
|
item.getHourlyPrecipitation(),
|
||||||
|
item.getDailyPrecipitation(),
|
||||||
|
item.getExtremeWindSpeedHourly(),
|
||||||
|
weatherAreaMap
|
||||||
|
))
|
||||||
|
.filter(Objects::nonNull);
|
||||||
|
|
||||||
|
Stream<SyncWeatherDataVO> nationalStream = nationalList.stream()
|
||||||
|
.map(item -> buildVO(
|
||||||
|
item.getProvince(),
|
||||||
|
item.getCity(),
|
||||||
|
item.getDistrict(),
|
||||||
|
item.getDataTime(),
|
||||||
|
item.getTemperature(),
|
||||||
|
item.getHourlyMaxTemperature(),
|
||||||
|
item.getHourlyMinTemperature(),
|
||||||
|
item.getHourlyPrecipitation(),
|
||||||
|
item.getDailyPrecipitation(),
|
||||||
|
item.getExtremeWindSpeedHourly(),
|
||||||
|
weatherAreaMap
|
||||||
|
))
|
||||||
|
.filter(Objects::nonNull);
|
||||||
|
|
||||||
|
return Stream.concat(regionalStream, nationalStream)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SyncWeatherDataVO buildVO(
|
||||||
|
String province,
|
||||||
|
String city,
|
||||||
|
String district,
|
||||||
|
String dataTime,
|
||||||
|
String temperature,
|
||||||
|
String hourlyMaxTemperature,
|
||||||
|
String hourlyMinTemperature,
|
||||||
|
String hourlyPrecipitation,
|
||||||
|
String dailyPrecipitation,
|
||||||
|
String extremeWindSpeedHourly,
|
||||||
|
Map<String, String> weatherAreaMap) {
|
||||||
|
|
||||||
|
String key = buildKey(province, city, district);
|
||||||
|
String orgCode = weatherAreaMap.get(key);
|
||||||
|
|
||||||
|
// 没匹配上区域编码,直接丢弃
|
||||||
|
if (orgCode == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncWeatherDataVO vo = new SyncWeatherDataVO();
|
||||||
|
vo.setOrgCode(orgCode);
|
||||||
|
vo.setDataTime(dataTime);
|
||||||
|
vo.setTemperature(temperature);
|
||||||
|
vo.setHourlyMaxTemperature(hourlyMaxTemperature);
|
||||||
|
vo.setHourlyMinTemperature(hourlyMinTemperature);
|
||||||
|
vo.setHourlyPrecipitation(hourlyPrecipitation);
|
||||||
|
vo.setDailyPrecipitation(dailyPrecipitation);
|
||||||
|
vo.setExtremeWindSpeedHourly(extremeWindSpeedHourly);
|
||||||
|
|
||||||
|
return vo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String buildKey(String province, String city, String district) {
|
||||||
|
return String.join(":",
|
||||||
|
safe(province),
|
||||||
|
safe(city),
|
||||||
|
safe(district)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String safe(String str) {
|
||||||
|
return str == null ? "" : str.trim();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,11 +1,14 @@
|
|||||||
package com.southern.power.grid.task;
|
package com.southern.power.grid.task;
|
||||||
|
|
||||||
|
import com.southern.power.grid.service.IDnerDailyPowerOutageEventSyncService;
|
||||||
import com.southern.power.grid.service.IRegionalWeatherDataService;
|
import com.southern.power.grid.service.IRegionalWeatherDataService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -15,15 +18,12 @@ public class WeatherDataScheduleTask {
|
|||||||
@Resource
|
@Resource
|
||||||
private IRegionalWeatherDataService regionalWeatherDataService;
|
private IRegionalWeatherDataService regionalWeatherDataService;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private IDnerDailyPowerOutageEventSyncService service;
|
||||||
|
|
||||||
// 防止并发执行
|
// 防止并发执行
|
||||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
|
|
||||||
/* @PostConstruct
|
|
||||||
public void init(){
|
|
||||||
log.info("【区域天气数据同步任务】项目启动触发");
|
|
||||||
execute();
|
|
||||||
}*/
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 每小时第20分钟执行
|
* 每小时第20分钟执行
|
||||||
*/
|
*/
|
||||||
@ -43,7 +43,9 @@ public class WeatherDataScheduleTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
regionalWeatherDataService.scheduleSyncWeatherData();
|
//获取当前时间前一个整点时间
|
||||||
|
String prevHourDate = LocalDateTime.now().minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
|
||||||
|
regionalWeatherDataService.scheduleSyncWeatherData(prevHourDate);
|
||||||
log.info("【区域天气数据同步任务】定时触发--结束");
|
log.info("【区域天气数据同步任务】定时触发--结束");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("【区域天气数据同步任务】执行异常", e);
|
log.error("【区域天气数据同步任务】执行异常", e);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user