提交RegionalWeatherData-定时同步代码(还没自测)
This commit is contained in:
parent
8fb5a164fa
commit
600242462e
@ -3,6 +3,7 @@ package com.southern.power.grid;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
/**
|
||||
* MyBatis-Plus测试应用启动类
|
||||
@ -11,6 +12,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableAsync
|
||||
@EnableScheduling
|
||||
public class SouthernPowerGridApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
package com.southern.power.grid.entity;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class SyncWeatherDataVO {
|
||||
private String orgCode; //区域编码
|
||||
private String dataTime; // 资料时次
|
||||
private String temperature; // 气温
|
||||
private String hourlyMaxTemperature; // 小时内最高气温
|
||||
private String hourlyMinTemperature; // 小时内最低气温
|
||||
private String hourlyPrecipitation; // 小时降水量
|
||||
private String dailyPrecipitation; // 日累计降水量
|
||||
private String extremeWindSpeedHourly; // 小时内极大风速
|
||||
|
||||
}
|
||||
@ -1,4 +1,10 @@
|
||||
package com.southern.power.grid.service;
|
||||
|
||||
public interface IRegionalWeatherDataService {
|
||||
|
||||
/**
|
||||
* 定时同步天气数据
|
||||
*
|
||||
*/
|
||||
void scheduleSyncWeatherData();
|
||||
}
|
||||
|
||||
@ -1,16 +1,281 @@
|
||||
package com.southern.power.grid.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import com.southern.power.grid.dao.NationalWeatherStationMapper;
|
||||
import com.southern.power.grid.dao.RegionalWeatherDataMapper;
|
||||
import com.southern.power.grid.entity.RegionalWeatherData;
|
||||
import com.southern.power.grid.dao.RegionalWeatherStationMapper;
|
||||
import com.southern.power.grid.dao.WeatherSiteAreaConfigurationMapper;
|
||||
import com.southern.power.grid.entity.*;
|
||||
import com.southern.power.grid.service.IRegionalWeatherDataService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class RegionalWeatherDataServiceImpl extends ServiceImpl<RegionalWeatherDataMapper, RegionalWeatherData>
|
||||
implements IRegionalWeatherDataService {
|
||||
|
||||
private final RegionalWeatherStationMapper regionalWeatherStationMapper;
|
||||
private final NationalWeatherStationMapper nationalWeatherStationMapper;
|
||||
private final WeatherSiteAreaConfigurationMapper weatherSiteAreaConfigurationMapper;
|
||||
private final Map<String, String> weatherAreaMap = new HashMap<>();
|
||||
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void scheduleSyncWeatherData() {
|
||||
try {
|
||||
//获取当前时间前一个整点时间
|
||||
String prevHourDate = LocalDateTime.now().minusHours(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:00:00"));
|
||||
List<RegionalWeatherStation> regionalWeatherStations = regionalWeatherStationMapper.selectList(new LambdaQueryWrapper<RegionalWeatherStation>()
|
||||
.eq(RegionalWeatherStation::getDataTime, prevHourDate));
|
||||
List<NationalWeatherStation> nationalWeatherStations = nationalWeatherStationMapper.selectList(new LambdaQueryWrapper<NationalWeatherStation>()
|
||||
.eq(NationalWeatherStation::getDataTime, prevHourDate));
|
||||
if (CollectionUtils.isEmpty(regionalWeatherStations) && CollectionUtils.isEmpty(nationalWeatherStations)) {
|
||||
return;
|
||||
}
|
||||
loadWeatherAreaConfig();
|
||||
List<SyncWeatherDataVO> list = convertToSyncList(regionalWeatherStations, nationalWeatherStations);
|
||||
if (CollectionUtils.isEmpty(list)) {
|
||||
return;
|
||||
}
|
||||
List<RegionalWeatherData> dataList = convert(list, prevHourDate);
|
||||
this.saveBatch(dataList, 700);
|
||||
} catch (Exception e) {
|
||||
log.error("定时同步区域气象数据失败:{}", e.getMessage(), e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<RegionalWeatherData> convert(List<SyncWeatherDataVO> list, String prevHourDate) {
|
||||
if (list == null || list.isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// 1. 按 orgCode 分组
|
||||
Map<String, List<SyncWeatherDataVO>> groupMap =
|
||||
list.stream().collect(Collectors.groupingBy(SyncWeatherDataVO::getOrgCode));
|
||||
|
||||
// 2. 分组处理
|
||||
List<RegionalWeatherData> result = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<String, List<SyncWeatherDataVO>> entry : groupMap.entrySet()) {
|
||||
String orgCode = entry.getKey();
|
||||
List<SyncWeatherDataVO> groupList = entry.getValue();
|
||||
|
||||
int size = groupList.size();
|
||||
|
||||
BigDecimal sumHourlyPrecipitation = BigDecimal.ZERO;
|
||||
BigDecimal sumTemperature = BigDecimal.ZERO;
|
||||
|
||||
BigDecimal maxTemp = null;
|
||||
BigDecimal minTemp = null;
|
||||
BigDecimal maxWind = null;
|
||||
|
||||
for (SyncWeatherDataVO vo : groupList) {
|
||||
|
||||
BigDecimal hourlyPre = parse(vo.getHourlyPrecipitation());
|
||||
BigDecimal temp = parse(vo.getTemperature());
|
||||
BigDecimal maxT = parse(vo.getHourlyMaxTemperature());
|
||||
BigDecimal minT = parse(vo.getHourlyMinTemperature());
|
||||
BigDecimal wind = parse(vo.getExtremeWindSpeedHourly());
|
||||
|
||||
// 累加
|
||||
sumHourlyPrecipitation = sumHourlyPrecipitation.add(hourlyPre);
|
||||
sumTemperature = sumTemperature.add(temp);
|
||||
|
||||
// 最大值
|
||||
if (maxTemp == null || maxT.compareTo(maxTemp) > 0) {
|
||||
maxTemp = maxT;
|
||||
}
|
||||
|
||||
// 最小值
|
||||
if (minTemp == null || minT.compareTo(minTemp) < 0) {
|
||||
minTemp = minT;
|
||||
}
|
||||
|
||||
// 最大风速
|
||||
if (maxWind == null || wind.compareTo(maxWind) > 0) {
|
||||
maxWind = wind;
|
||||
}
|
||||
}
|
||||
|
||||
// 平均值(保留2位小数)
|
||||
String avgHourlyPrecipitation = divide(sumHourlyPrecipitation, size);
|
||||
String avgTemperature = divide(sumTemperature, size);
|
||||
|
||||
// 3. 构建实体
|
||||
RegionalWeatherData data = new RegionalWeatherData();
|
||||
data.setOrgCode(orgCode);
|
||||
data.setDataTime(prevHourDate);
|
||||
data.setHourlyPrecipitation(avgHourlyPrecipitation);
|
||||
data.setDailyPrecipitation("0");
|
||||
data.setTemperature(avgTemperature);
|
||||
data.setHourlyMaxTemperature(toStr(maxTemp));
|
||||
data.setHourlyMinTemperature(toStr(minTemp));
|
||||
data.setExtremeWindSpeedHourly(toStr(maxWind));
|
||||
|
||||
result.add(data);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* String → BigDecimal(空值安全)
|
||||
*/
|
||||
private static BigDecimal parse(String val) {
|
||||
if (val == null || val.trim().isEmpty()) {
|
||||
return BigDecimal.ZERO;
|
||||
}
|
||||
try {
|
||||
return new BigDecimal(val.trim());
|
||||
} catch (Exception e) {
|
||||
return BigDecimal.ZERO;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 平均值计算(保留2位小数)
|
||||
*/
|
||||
private static String divide(BigDecimal sum, int size) {
|
||||
if (size == 0) {
|
||||
return "0";
|
||||
}
|
||||
return sum.divide(BigDecimal.valueOf(size), 2, RoundingMode.HALF_UP).toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* BigDecimal → String
|
||||
*/
|
||||
private static String toStr(BigDecimal val) {
|
||||
return val == null ? "0" : val.setScale(2, RoundingMode.HALF_UP).toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 加载气象区划配置表
|
||||
*
|
||||
**/
|
||||
private void loadWeatherAreaConfig() {
|
||||
weatherAreaMap.clear();
|
||||
List<WeatherSiteAreaConfiguration> list = weatherSiteAreaConfigurationMapper.selectAll();
|
||||
if (CollectionUtils.isEmpty(list)) return;
|
||||
Map<String, String> tempMap = list.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.filter(config -> config.getWeatherProvince() != null
|
||||
&& config.getWeatherCity() != null
|
||||
&& config.getWeatherDistrict() != null
|
||||
&& config.getDistrictCode() != null)
|
||||
.collect(Collectors.toMap(
|
||||
config -> String.join(":",
|
||||
config.getWeatherProvince(),
|
||||
config.getWeatherCity(),
|
||||
config.getWeatherDistrict()),
|
||||
WeatherSiteAreaConfiguration::getDistrictCode,
|
||||
(oldValue, newValue) -> oldValue
|
||||
));
|
||||
|
||||
// 全部放入你的成员变量 Map
|
||||
weatherAreaMap.putAll(tempMap);
|
||||
}
|
||||
|
||||
public List<SyncWeatherDataVO> convertToSyncList(
|
||||
List<RegionalWeatherStation> regionalList,
|
||||
List<NationalWeatherStation> nationalList) {
|
||||
|
||||
Stream<SyncWeatherDataVO> regionalStream = regionalList.stream()
|
||||
.map(item -> buildVO(
|
||||
item.getProvince(),
|
||||
item.getCity(),
|
||||
item.getDistrict(),
|
||||
item.getDataTime(),
|
||||
item.getTemperature(),
|
||||
item.getHourlyMaxTemperature(),
|
||||
item.getHourlyMinTemperature(),
|
||||
item.getHourlyPrecipitation(),
|
||||
item.getDailyPrecipitation(),
|
||||
item.getExtremeWindSpeedHourly(),
|
||||
weatherAreaMap
|
||||
))
|
||||
.filter(Objects::nonNull);
|
||||
|
||||
Stream<SyncWeatherDataVO> nationalStream = nationalList.stream()
|
||||
.map(item -> buildVO(
|
||||
item.getProvince(),
|
||||
item.getCity(),
|
||||
item.getDistrict(),
|
||||
item.getDataTime(),
|
||||
item.getTemperature(),
|
||||
item.getHourlyMaxTemperature(),
|
||||
item.getHourlyMinTemperature(),
|
||||
item.getHourlyPrecipitation(),
|
||||
item.getDailyPrecipitation(),
|
||||
item.getExtremeWindSpeedHourly(),
|
||||
weatherAreaMap
|
||||
))
|
||||
.filter(Objects::nonNull);
|
||||
|
||||
return Stream.concat(regionalStream, nationalStream)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static SyncWeatherDataVO buildVO(
|
||||
String province,
|
||||
String city,
|
||||
String district,
|
||||
String dataTime,
|
||||
String temperature,
|
||||
String hourlyMaxTemperature,
|
||||
String hourlyMinTemperature,
|
||||
String hourlyPrecipitation,
|
||||
String dailyPrecipitation,
|
||||
String extremeWindSpeedHourly,
|
||||
Map<String, String> weatherAreaMap) {
|
||||
|
||||
String key = buildKey(province, city, district);
|
||||
String orgCode = weatherAreaMap.get(key);
|
||||
|
||||
// 没匹配上区域编码,直接丢弃
|
||||
if (orgCode == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
SyncWeatherDataVO vo = new SyncWeatherDataVO();
|
||||
vo.setOrgCode(orgCode);
|
||||
vo.setDataTime(dataTime);
|
||||
vo.setTemperature(temperature);
|
||||
vo.setHourlyMaxTemperature(hourlyMaxTemperature);
|
||||
vo.setHourlyMinTemperature(hourlyMinTemperature);
|
||||
vo.setHourlyPrecipitation(hourlyPrecipitation);
|
||||
vo.setDailyPrecipitation(dailyPrecipitation);
|
||||
vo.setExtremeWindSpeedHourly(extremeWindSpeedHourly);
|
||||
|
||||
return vo;
|
||||
}
|
||||
|
||||
private static String buildKey(String province, String city, String district) {
|
||||
return String.join(":",
|
||||
safe(province),
|
||||
safe(city),
|
||||
safe(district)
|
||||
);
|
||||
}
|
||||
|
||||
private static String safe(String str) {
|
||||
return str == null ? "" : str.trim();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,46 @@
|
||||
package com.southern.power.grid.task;
|
||||
|
||||
import com.southern.power.grid.service.IRegionalWeatherDataService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class WeatherDataScheduleTask {
|
||||
|
||||
@Resource
|
||||
private IRegionalWeatherDataService regionalWeatherDataService;
|
||||
|
||||
// 防止并发执行
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
/**
|
||||
* 每小时第20分钟执行
|
||||
*/
|
||||
//@Scheduled(cron = "0 20 * * * ?")
|
||||
public void schedule() {
|
||||
log.info("【区域天气数据同步任务】定时触发");
|
||||
execute();
|
||||
}
|
||||
|
||||
/**
|
||||
* 实际执行逻辑
|
||||
*/
|
||||
private void execute() {
|
||||
if (!running.compareAndSet(false, true)) {
|
||||
log.warn("任务正在执行,跳过本次触发");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
regionalWeatherDataService.scheduleSyncWeatherData();
|
||||
} catch (Exception e) {
|
||||
log.error("【区域天气数据同步任务】执行异常", e);
|
||||
} finally {
|
||||
running.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user