Commit 95136c08 authored by 王宦普's avatar 王宦普

whp:init

parents
Pipeline #9999 failed with stages
[
{
"pythonType": "CI",
"pythonFile": "./config/python/CI/run_cmd.py"
},
{
"pythonType": "CF",
"pythonFile": "./config/python/"
},
{
"pythonType": "CCB",
"pythonFile": "./config/python/"
},
{
"pythonType": "SCWW",
"pythonFile": "./config/python/"
},
{
"pythonType": "SCWS",
"pythonFile": "./config/python/"
},
{
"pythonType": "6",
"pythonFile": "./config/python/"
},
{
"pythonType": "LIS",
"pythonFile": "./config/python/"
},
{
"pythonType": "LIM",
"pythonFile": "./config/python/"
},
{
"pythonType": "LCD",
"pythonFile": "./config/python/"
},
{
"pythonType": "DE",
"pythonFile": "./config/python/"
},
{
"pythonType": "DP",
"pythonFile": "./config/python/"
},
{
"pythonType": "12",
"pythonFile": "./config/python/"
}
]
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.jianju</groupId>
<artifactId>HunanConsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>HunanConsumer</name>
<description>HunanConsumer</description>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.version>3.8.1</maven.compiler.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!--打包时去除第三方依赖-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<includeSystemScope>true</includeSystemScope>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.version}</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.jianju.hunanconsumer;
import com.jianju.hunanconsumer.config.MqConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties({MqConfig.class})
public class HunanConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(HunanConsumerApplication.class, args);
}
}
package com.jianju.hunanconsumer.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
@ConfigurationProperties(prefix = "config")
@Data
public class MqConfig {
private Map<String,Object> mq;
}
package com.jianju.hunanconsumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
package com.jianju.hunanconsumer.factory;
import java.util.HashMap;
import java.util.Map;
public interface AnalysisData {
Integer analysisType();
boolean analysisData(Map<String,Object> Params);
}
//package com.jianju.hunanconsumer.factory.impl;
//
//import com.jianju.hunanconsumer.factory.AnalysisData;
//import org.springframework.stereotype.Component;
//
//import java.io.IOException;
//import java.util.Map;
//@Component
//public class ConvectionAnalysis implements AnalysisData {
// @Override
// public Integer analysisType() {
// return 1;
// }
//
// @Override
// public boolean analysisData(Map<String, Object> params) {
// String pythonPath = (String) params.get("pythonPath");
// String fileInput = (String) params.get("data");
//
// ProcessBuilder pb = new ProcessBuilder(
// "python3", pythonPath,
// "-i", fileInput
// );
//
// try {
// pb.redirectErrorStream(true);
// Process process = pb.start();
// int exitCode = process.waitFor();
// if (exitCode == 0) {
// return true;
// }
//
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// return false;
// }
//}
package com.jianju.hunanconsumer.mq;
import com.alibaba.fastjson.JSONObject;
import com.jianju.hunanconsumer.config.MqConfig;
import com.jianju.hunanconsumer.util.AnalysisDataUtil;
import com.jianju.hunanconsumer.util.ApiRequest;
import com.jianju.hunanconsumer.util.MqFactory;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.OnClose;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Component
@Slf4j
public class MqConsumer {
@Autowired
private RestTemplate restTemplate;
List<Map> configList;
@Resource
private MqConfig mqConfig;
@Value("${config.python}")
String pythonPath;
@Value("${config.apiConfig}")
String apiPath;
@Value("${config.otherApiConfig}")
String otherApiPath;
Map<Integer,Object> analysisDataMap = new HashMap<>();
@PostConstruct
public void init() throws IOException, TimeoutException {
Path path = Paths.get(pythonPath);
String dataInfo = new String(Files.readAllBytes(path));
configList = JSONObject.parseArray(dataInfo, Map.class);
consumerMq();
}
public void consumerMq() throws IOException, TimeoutException {
Map<String, Object> mqConfigMap = mqConfig.getMq();
Channel channelTask = MqFactory.getOrCreateChannel(mqConfigMap, "task");
channelTask.basicQos(1);
channelTask.basicConsume((String) mqConfigMap.get("queue"), false,
(consumerTag, message) -> {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到消息: " + body);
JSONObject jsonData = null;
try {
jsonData = JSONObject.parseObject(body);
Map<String,Object> params = new HashMap<>();
params.put("data", jsonData.get("msg"));
String type = (String) jsonData.get("type");
for (Map map : configList) {
if (map.get("pythonType").equals(type)) {
params.put("pythonPath", map.get("pythonFile"));
break;
}
}
//其他系统回调
// Map<String, Object> param2 = new HashMap<>();
// param2.put("jobId",jsonData.get("uuid"));
// param2.put("status",1);
// param2.put("outputParam","");
// param2.put("message", "");
// ApiRequest.Send(otherApiPath,param2,restTemplate);
//解析python
Map<String,Object> result = AnalysisDataUtil.analysisData(params);
//正确消息回调
Map<String, Object> param = new HashMap<>();
param.put("id",jsonData.get("uuid"));
param.put("type", result.get("type"));
param.put("msg", result.get("msg"));
log.info("SendMessage");
ApiRequest.Send(apiPath,param,restTemplate);
//其他系统回调
// Map<String, Object> param3 = new HashMap<>();
// param3.put("jobId",jsonData.get("uuid"));
// param3.put("status",result.get("type"));
// param3.put("outputParam", JSONObject.parseObject((String) result.get("msg")));
// param3.put("message", "");
// ApiRequest.Send(otherApiPath,param3,restTemplate);
channelTask.basicAck(message.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
Map<String, Object> param = new HashMap<>();
param.put("id",jsonData.get("uuid"));
param.put("type", 2);
param.put("msg", e.getMessage());
ApiRequest.Send(apiPath,param,restTemplate);
//其他系统回调
// Map<String, Object> param2 = new HashMap<>();
// param2.put("jobId",jsonData.get("uuid"));
// param2.put("status",3);
// param2.put("outputParam", "");
// param2.put("message", e.getMessage());
// ApiRequest.Send(otherApiPath,param2,restTemplate);
// channelTask.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
},
consumerTag -> {
System.out.println("取消消费: " + consumerTag);
});
}
}
package com.jianju.hunanconsumer.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class AnalysisDataUtil {
public static Map<String, Object> analysisData(Map<String, Object> params) throws Exception {
Map<String, Object> result = new HashMap<>();
String pythonPath = (String) params.get("pythonPath");
String fileInput = (String) params.get("data");
System.out.println("pythonPath: " + pythonPath);
System.out.println("fileInput : " + fileInput);
// 构建 ProcessBuilder
ProcessBuilder pb = new ProcessBuilder(
"/opt/conda/envs/py310gdal/bin/python",
"-u",
pythonPath,
"-i",
fileInput
);
// 配置环境变量
Map<String, String> env = pb.environment();
env.put("GDAL_DATA", "/opt/conda/envs/py310gdal/share/gdal");
env.put("PROJ_LIB", "/opt/conda/envs/py310gdal/share/proj");
// 确保 gdal_translate 等命令可用
String oldPath = env.getOrDefault("PATH", "");
env.put("PATH", "/opt/conda/envs/py310gdal/bin:" + oldPath);
pb.redirectErrorStream(true);
Process process = pb.start();
System.out.println("startPython");
ObjectMapper mapper = new ObjectMapper();
String jsonResult = null; // 保存最后一条合法 JSON
// 读取并实时打印 Python 输出
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println("[PYTHON] " + line);
String trimmed = line.trim();
// 如果以 { 开头以 } 结尾,尝试解析
if (trimmed.startsWith("{") && trimmed.endsWith("}")) {
try {
// 如果是 Python dict 风格,把单引号换成双引号再解析
String jsonCandidate = trimmed.replace('\'', '"');
mapper.readTree(jsonCandidate); // 验证是否合法
jsonResult = jsonCandidate; // 始终覆盖,确保取到最后一条
} catch (Exception ignore) {
// 不是合法 JSON 则忽略
}
}
}
}
int exitCode = process.waitFor();
System.out.println("endPython, exit code: " + exitCode);
if (exitCode == 0 && jsonResult != null) {
result.put("type", 2);
result.put("msg", jsonResult); // 返回最后一条 JSON
} else {
result.put("flag", 3);
result.put("msg", "python not Found Json or exitCode=" + exitCode);
}
return result;
}
}
package com.jianju.hunanconsumer.util;
import org.springframework.web.client.RestTemplate;
public class ApiRequest {
public static void Send(String url,Object object, RestTemplate restTemplate){
String response = restTemplate.postForObject(url, object,String.class);
}
}
package com.jianju.hunanconsumer.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
public class MqFactory {
private static final Map<String, Channel> channelCache = new ConcurrentHashMap<>();
public static synchronized Channel getOrCreateChannel(Map<String, Object> config, String taskName) throws IOException, TimeoutException {
if (channelCache.containsKey(taskName)) {
return channelCache.get(taskName);
}
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost((String) config.get("host"));
factory.setPort((Integer) config.get("port"));
factory.setUsername((String)config.get("username"));
factory.setPassword((String)config.get("password"));
// 2. 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明交换机和队列(可选)
if (config.containsKey("queueDead")) {
// --- 设置主队列绑定死信机制 ---
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", config.get("exchangeDead"));
arguments.put("x-dead-letter-routing-key", config.get("routingkeyDead"));
String exchange = (String) config.get("exchange");
String queueName = (String) config.get("queue");
String routingKey = (String) config.get("routingkey");
channel.exchangeDeclare(exchange, "direct", true);
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchange, routingKey);
// --- 设置死信交换机 + 死信队列 ---
String exchangeDead = (String) config.get("exchangeDead");
String queueDeadName = (String) config.get("queueDead");
String routingDeadkey = (String) config.get("routingkeyDead");
channel.exchangeDeclare(exchangeDead, "direct", true);
channel.queueDeclare(queueDeadName, true, false, false, null);
channel.queueBind(queueDeadName, exchangeDead, routingDeadkey);
}
//绑定死信队列,避免消费者那边出现,消费没成功,信息不能放入死信队列
if (!config.containsKey("queueDead")) {
// 死信处理线程,只需要绑定死信队列
String exchange = (String) config.get("exchange");
String queueName = (String) config.get("queue");
String routingKey = (String) config.get("routingkey");
channel.exchangeDeclare(exchange, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchange, routingKey);
}
// 4. 缓存并返回
channelCache.put(taskName, channel);
return channel;
}
public static Channel getChannel(String taskName) {
return channelCache.get(taskName);
}
public static void closeAll() {
for (Channel channel : channelCache.values()) {
try {
if (channel.isOpen()) {
channel.close();
channel.getConnection().close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
channelCache.clear();
}
}
server:
port: 8081
config:
python: ./config/PythonConfig.json
mq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
queue: hunan_queue
exchange: hunan_exchang
routingkey: hunan
apiConfig: "http://localhost:8080/api/ConfirmMessage"
otherApiConfig: "http://localhost:8082/disaster/callback"
package com.jianju.hunanconsumer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class HunanConsumerApplicationTests {
@Test
void contextLoads() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment