From deae22578543791f58cdb12f904bc87f2c009fac Mon Sep 17 00:00:00 2001 From: zhaoyz <11@11.com> Date: Tue, 16 Apr 2024 17:22:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/rax/vital/config/WebSocketConfig.java | 14 +- .../rax/vital/handler/AddMedicineHandler.java | 66 +++++++ .../com/rax/vital/handler/ChatHandler.java | 1 - .../vital/handler/MachineFeedbackHandler.java | 46 +++++ .../rax/vital/handler/MedicineHandler.java | 24 ++- .../interceptor/WebSocketInterceptors.java | 9 +- .../com/rax/vital/timer/VitalSignTimer.java | 172 +++++++++++++++++- .../com/rax/vital/util/GetHttpParamUtil.java | 19 ++ 8 files changed, 337 insertions(+), 14 deletions(-) create mode 100644 vital-signs/src/main/java/com/rax/vital/handler/AddMedicineHandler.java create mode 100644 vital-signs/src/main/java/com/rax/vital/handler/MachineFeedbackHandler.java create mode 100644 vital-signs/src/main/java/com/rax/vital/util/GetHttpParamUtil.java diff --git a/vital-signs/src/main/java/com/rax/vital/config/WebSocketConfig.java b/vital-signs/src/main/java/com/rax/vital/config/WebSocketConfig.java index fd48280..862aea3 100644 --- a/vital-signs/src/main/java/com/rax/vital/config/WebSocketConfig.java +++ b/vital-signs/src/main/java/com/rax/vital/config/WebSocketConfig.java @@ -1,9 +1,10 @@ package com.rax.vital.config; +import com.rax.vital.handler.AddMedicineHandler; import com.rax.vital.handler.ChatHandler; +import com.rax.vital.handler.MachineFeedbackHandler; import com.rax.vital.handler.MedicineHandler; import com.rax.vital.interceptor.WebSocketInterceptors; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.WebSocketHandler; @@ -20,6 +21,8 @@ public class WebSocketConfig implements WebSocketConfigurer { public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(medicineHandler(), "/rax/vitalSignsMedicine") .addHandler(chatHandler(), "/rax/chatRoom") + .addHandler(addMedicineHandler(), "/rax/addMedicine") + .addHandler(machineHandler(),"/rax/getMedicine") // .addInterceptors(new HttpSessionHandshakeInterceptor()) .addInterceptors(webSocketHandshakeInterceptor()) .setAllowedOrigins("*"); @@ -35,6 +38,15 @@ public class WebSocketConfig implements WebSocketConfigurer { return new ChatHandler(); } + @Bean + public WebSocketHandler addMedicineHandler() { + return new AddMedicineHandler(); + } + + public WebSocketHandler machineHandler() { + return new MachineFeedbackHandler(); + } + @Bean public HandshakeInterceptor webSocketHandshakeInterceptor() { return new WebSocketInterceptors(); diff --git a/vital-signs/src/main/java/com/rax/vital/handler/AddMedicineHandler.java b/vital-signs/src/main/java/com/rax/vital/handler/AddMedicineHandler.java new file mode 100644 index 0000000..4e9f004 --- /dev/null +++ b/vital-signs/src/main/java/com/rax/vital/handler/AddMedicineHandler.java @@ -0,0 +1,66 @@ +package com.rax.vital.handler; + +import com.alibaba.fastjson.JSONObject; +import com.rax.vital.timer.VitalSignTimer; +import com.rax.vital.util.DatabaseNameUtil; +import com.rax.vital.util.GetHttpParamUtil; +import jakarta.annotation.Resource; +import org.springframework.security.oauth2.server.authorization.OAuth2Authorization; +import org.springframework.security.oauth2.server.authorization.OAuth2AuthorizationService; +import org.springframework.security.oauth2.server.authorization.OAuth2TokenType; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.net.URLDecoder; +import java.util.Map; + +public class AddMedicineHandler implements WebSocketHandler { + @Resource + private VitalSignTimer vitalSignTimer; + + @Resource + private OAuth2AuthorizationService authorizationService; + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + vitalSignTimer.setWSAIFlagSession(session); + } + + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + String decode = URLDecoder.decode(session.getUri().getQuery()); + Map params = GetHttpParamUtil.getParams(decode); + String token = (String) params.get("token"); + OAuth2Authorization authorization = authorizationService.findByToken(token, OAuth2TokenType.ACCESS_TOKEN); + String username = authorization.getPrincipalName(); + + JSONObject jsonObject = JSONObject.parseObject((String) message.getPayload()); + // 病人名 + String patientName = jsonObject.getString("patientName"); + // 病人身份证 + String idNum = jsonObject.getString("idNum"); + // yyyyMMdd + String date = jsonObject.getString("date"); + String databaseName = DatabaseNameUtil.encrypt(patientName) + "_" + DatabaseNameUtil.encrypt(idNum) + "_" + date; + vitalSignTimer.changeWSAIFlag(databaseName, username, session, jsonObject.getString("flag"), + jsonObject.getString("medicine"), jsonObject.getString("value")); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + vitalSignTimer.removeWSAIFlagSession(session); + System.out.println(this.getClass().getName() + " Connection closed:" + closeStatus.getReason()); + } + + @Override + public boolean supportsPartialMessages() { + return false; + } +} diff --git a/vital-signs/src/main/java/com/rax/vital/handler/ChatHandler.java b/vital-signs/src/main/java/com/rax/vital/handler/ChatHandler.java index d65ed5e..e237168 100644 --- a/vital-signs/src/main/java/com/rax/vital/handler/ChatHandler.java +++ b/vital-signs/src/main/java/com/rax/vital/handler/ChatHandler.java @@ -1,6 +1,5 @@ package com.rax.vital.handler; -import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketMessage; diff --git a/vital-signs/src/main/java/com/rax/vital/handler/MachineFeedbackHandler.java b/vital-signs/src/main/java/com/rax/vital/handler/MachineFeedbackHandler.java new file mode 100644 index 0000000..3bc2b64 --- /dev/null +++ b/vital-signs/src/main/java/com/rax/vital/handler/MachineFeedbackHandler.java @@ -0,0 +1,46 @@ +package com.rax.vital.handler; + +import com.alibaba.fastjson.JSONObject; +import com.rax.vital.timer.VitalSignTimer; +import jakarta.annotation.Resource; +import org.springframework.security.oauth2.server.authorization.OAuth2AuthorizationService; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +public class MachineFeedbackHandler implements WebSocketHandler { + @Resource + private OAuth2AuthorizationService authorizationService; + + @Resource + private VitalSignTimer vitalSignTimer; + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + vitalSignTimer.setMachineSessionMap(session); + } + + @Override + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + JSONObject jsonObject = JSONObject.parseObject((String) message.getPayload()); + String database = jsonObject.getString("database"); + jsonObject.getBoolean("status"); + vitalSignTimer.sendMachineFlag(database, session); + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { + vitalSignTimer.removeMachineSessionMap(session); + System.out.println(this.getClass().getName() + " Connection closed:" + closeStatus.getReason()); + } + + @Override + public boolean supportsPartialMessages() { + return false; + } +} diff --git a/vital-signs/src/main/java/com/rax/vital/handler/MedicineHandler.java b/vital-signs/src/main/java/com/rax/vital/handler/MedicineHandler.java index e837c29..4a4489a 100644 --- a/vital-signs/src/main/java/com/rax/vital/handler/MedicineHandler.java +++ b/vital-signs/src/main/java/com/rax/vital/handler/MedicineHandler.java @@ -3,18 +3,27 @@ package com.rax.vital.handler; import com.alibaba.fastjson.JSONObject; import com.rax.vital.timer.VitalSignTimer; import com.rax.vital.util.DatabaseNameUtil; +import com.rax.vital.util.GetHttpParamUtil; import jakarta.annotation.Resource; -import org.springframework.stereotype.Component; +import org.springframework.security.oauth2.server.authorization.OAuth2Authorization; +import org.springframework.security.oauth2.server.authorization.OAuth2AuthorizationService; +import org.springframework.security.oauth2.server.authorization.OAuth2TokenType; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; +import java.net.URLDecoder; +import java.util.Map; + public class MedicineHandler implements WebSocketHandler { @Resource private VitalSignTimer vitalSignTimer; + @Resource + private OAuth2AuthorizationService authorizationService; + @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { session.getPrincipal(); @@ -22,23 +31,30 @@ public class MedicineHandler implements WebSocketHandler { @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + String decode = URLDecoder.decode(session.getUri().getQuery()); + Map params = GetHttpParamUtil.getParams(decode); + String token = (String) params.get("token"); + OAuth2Authorization authorization = authorizationService.findByToken(token, OAuth2TokenType.ACCESS_TOKEN); + String username = authorization.getPrincipalName(); + String payload = (String) message.getPayload(); JSONObject jsonObject = JSONObject.parseObject(payload); String patientName = jsonObject.getString("patientName"); String idNum = jsonObject.getString("idNum"); String date = jsonObject.getString("date"); String databaseName = DatabaseNameUtil.encrypt(patientName) + "_" + DatabaseNameUtil.encrypt(idNum) + "_" + date; -// vitalSignTimer.createAndSendMessageMongo(databaseName, username, simpSessionId); + vitalSignTimer.createAndSendWSMessageMongo(databaseName, username, session); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - + System.out.println("Error: " + exception.getMessage()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { - + System.out.println("MedicineHandler Connection closed:" + closeStatus.getReason()); + vitalSignTimer.stopTimerTaskMongo(session.getId()); } @Override diff --git a/vital-signs/src/main/java/com/rax/vital/interceptor/WebSocketInterceptors.java b/vital-signs/src/main/java/com/rax/vital/interceptor/WebSocketInterceptors.java index cc73618..9b4e480 100644 --- a/vital-signs/src/main/java/com/rax/vital/interceptor/WebSocketInterceptors.java +++ b/vital-signs/src/main/java/com/rax/vital/interceptor/WebSocketInterceptors.java @@ -1,5 +1,6 @@ package com.rax.vital.interceptor; +import com.rax.vital.util.GetHttpParamUtil; import jakarta.annotation.Resource; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; @@ -9,6 +10,7 @@ import org.springframework.security.oauth2.server.authorization.OAuth2TokenType; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; +import java.net.URLDecoder; import java.util.Map; public class WebSocketInterceptors implements HandshakeInterceptor { @@ -18,8 +20,9 @@ public class WebSocketInterceptors implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { - - String token = (String) attributes.get("token"); + String decode = URLDecoder.decode(request.getURI().getQuery()); + Map params = GetHttpParamUtil.getParams(decode); + String token = (String) params.get("token"); OAuth2Authorization authorization = authorizationService.findByToken(token, OAuth2TokenType.ACCESS_TOKEN); if (authorization == null ) { return false; @@ -29,6 +32,6 @@ public class WebSocketInterceptors implements HandshakeInterceptor { @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { - + System.out.println("request = " + request + ", response = " + response + ", wsHandler = " + wsHandler + ", exception = " + exception); } } diff --git a/vital-signs/src/main/java/com/rax/vital/timer/VitalSignTimer.java b/vital-signs/src/main/java/com/rax/vital/timer/VitalSignTimer.java index edb470b..8f38ee4 100644 --- a/vital-signs/src/main/java/com/rax/vital/timer/VitalSignTimer.java +++ b/vital-signs/src/main/java/com/rax/vital/timer/VitalSignTimer.java @@ -1,5 +1,7 @@ package com.rax.vital.timer; +import cn.hutool.core.collection.ConcurrentHashSet; +import com.alibaba.fastjson.JSONObject; import com.rax.common.security.util.SecurityUtils; import com.rax.vital.datasource.MongoDBSource; import com.rax.vital.datasource.MySQLSource; @@ -10,9 +12,16 @@ import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; +import org.springframework.web.socket.BinaryMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; +import java.io.IOException; import java.sql.Connection; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * 生命体征和用药信息推送 @@ -38,16 +47,17 @@ public class VitalSignTimer { private final RevulsionService revulsionService; // mongoDB定时任务容器 - private static volatile Map timerMongoTaskMap = new HashMap<>(300); + private static final Map timerMongoTaskMap = new ConcurrentHashMap<>() { + }; // mongoDB链接工具类容器 - private static volatile Map mongoDBSourceMap = new HashMap<>(300); + private static final Map mongoDBSourceMap = new ConcurrentHashMap<>(); // mysql定时任务容器 - private static volatile Map timerMysqlTaskMap = new HashMap<>(300); + private static final Map timerMysqlTaskMap = new ConcurrentHashMap<>(); // mysql链接容器 - private static volatile Map mysqlConnectionMap = new HashMap(300); + private static final Map mysqlConnectionMap = new ConcurrentHashMap(); // MongoDB的地址 @Value("${vital-sign.mongodb.host}") @@ -73,7 +83,15 @@ public class VitalSignTimer { @Value("${vital-sign.mysql.password}") private String mysqlPassword; - private static volatile Map masterControlMap = new HashMap(300); + private static final Map masterControlMap = new ConcurrentHashMap<>(); + + private static final Map userSessionMap = new ConcurrentHashMap<>(); + + private static final Map userDatabaseSessionMap = new ConcurrentHashMap<>(); + + private static final Map machineSessionMap = new ConcurrentHashMap<>(); + + private static final Map machineDatabaseSessionMap = new ConcurrentHashMap<>(); /** * 根据当前用户和患者数据库进行查询生命体征和用药信息并推送,数据库类型是MongoDB @@ -127,6 +145,7 @@ public class VitalSignTimer { timerMongoTaskMap.put(simpSessionId, timerTask); } + /** * 根据当前用户和患者数据库进行查询生命体征和用药信息并推送,数据库类型是MySQL * @@ -235,4 +254,147 @@ public class VitalSignTimer { } } } + + public void createAndSendWSMessageMongo(String database, String username, WebSocketSession session) { + + synchronized (username) { + if (!masterControlMap.containsKey(database)) { + masterControlMap.put(database, username); + } + } + + String sessionId = session.getId(); + + TimerTask task = timerMongoTaskMap.get(sessionId); + if (task != null) { + return; + } + + MongoDBSource mongoDBSource = mongoDBSourceMap.get(sessionId); + if (mongoDBSource == null) { + mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); + mongoDBSourceMap.put(sessionId, mongoDBSource); + mongoDBSource.open(); + } + + MongoDBSource finalMongoDBSource = mongoDBSource; + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + + MongoTemplate template = finalMongoDBSource.getTemplate(); + JSONObject jsonObject = new JSONObject(); + List vitalSignsList = vitalSignsService.getVitalSignsList(template); + jsonObject.put("vitalSignsList", vitalSignsList); + List aiMedicineList = aiMedicineService.getAIMedicine(template); + jsonObject.put("aiMedicineList", aiMedicineList); + List docMedicineList = doctorMedicineService.getDocMedicine(template); + jsonObject.put("docMedicineList", docMedicineList); + List revulsionList = revulsionService.getRevulsionServiceList(template); + jsonObject.put("revulsionList", revulsionList); + List flags = flagService.getFlags(template); + jsonObject.put("flags", flags); + + WebSocketMessage message = new TextMessage(jsonObject.toJSONString().getBytes()); + try { + session.sendMessage(message); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + // 定时任务,设置1秒 + Timer timer = new Timer(); + timer.schedule(timerTask, 0, 1000); + timerMongoTaskMap.put(sessionId, timerTask); + } + + public synchronized void setWSAIFlagSession(WebSocketSession session) { + userSessionMap.put(session.getId(), session); + } + + public void removeWSAIFlagSession(WebSocketSession session) { + userSessionMap.remove(session.getId()); + if (userDatabaseSessionMap.containsValue(session.getId())) { + for (String database : userDatabaseSessionMap.values()) { + if (userDatabaseSessionMap.get(database).equals(session.getId())) { + userDatabaseSessionMap.remove(database); + } + } + } + } + + public void changeWSAIFlag(String database, String username, WebSocketSession session, String flag, String medicine, String value) throws IOException { + synchronized (username) { + JSONObject result = new JSONObject(); + if (masterControlMap.containsKey(database) && masterControlMap.get(database).equals(username)) { + if (!userDatabaseSessionMap.containsKey(database)) { + userDatabaseSessionMap.put(database, session.getId()); + } + if (machineDatabaseSessionMap.containsKey(database)) { + String sessionId = machineDatabaseSessionMap.get(database); + WebSocketSession machineSession = machineSessionMap.get(sessionId); + JSONObject medicineJson = new JSONObject(); + medicineJson.put(medicine, value); + medicineJson.put("flag", flag); + machineSession.sendMessage(new TextMessage(medicineJson.toJSONString().getBytes())); + } else { + result.put("msg", "设备端未连接"); + result.put("status", 1); + session.sendMessage(new TextMessage(result.toJSONString().getBytes())); + } + } else { + result.put("status", 1); + result.put("msg", "不是主控人员"); + session.sendMessage(new TextMessage(result.toJSONString().getBytes())); + } + } + } + + public synchronized void setMachineSessionMap(WebSocketSession session) throws IOException { + machineSessionMap.put(session.getId(), session); + } + + public void removeMachineSessionMap(WebSocketSession session) throws IOException { + machineSessionMap.remove(session.getId()); + if (machineDatabaseSessionMap.containsValue(session.getId())) { + for (String database : machineDatabaseSessionMap.values()) { + if (machineDatabaseSessionMap.get(database).equals(session.getId())) { + machineDatabaseSessionMap.remove(database); + } + } + } + } + + public synchronized void sendMachineFlag(String database, WebSocketSession session) throws IOException { + if (machineDatabaseSessionMap.containsKey(database)) { + if (machineDatabaseSessionMap.get(database).equals(session.getId())) { + sendUserMessage(database, session); + } else { + JSONObject msg = new JSONObject(); + msg.put("status", 1); + msg.put("msg", "手术已有设备远程中"); + session.sendMessage(new TextMessage(msg.toJSONString().getBytes())); + } + } else { + machineDatabaseSessionMap.put(database, session.getId()); + sendUserMessage(database, session); + } + } + + private synchronized void sendUserMessage(String database, WebSocketSession session) throws IOException { + JSONObject result = new JSONObject(); + if (userDatabaseSessionMap.containsKey(database)) { + String userSessionId = userDatabaseSessionMap.get(database); + WebSocketSession webSocketSession = userSessionMap.get(userSessionId); + result.put("status", 0); + result.put("msg", ""); + webSocketSession.sendMessage(new TextMessage(result.toJSONString().getBytes())); + } else { + result.put("status", 1); + result.put("msg", "网站未进行远程"); + session.sendMessage(new TextMessage(result.toJSONString().getBytes())); + } + } + } diff --git a/vital-signs/src/main/java/com/rax/vital/util/GetHttpParamUtil.java b/vital-signs/src/main/java/com/rax/vital/util/GetHttpParamUtil.java new file mode 100644 index 0000000..b9d5910 --- /dev/null +++ b/vital-signs/src/main/java/com/rax/vital/util/GetHttpParamUtil.java @@ -0,0 +1,19 @@ +package com.rax.vital.util; + +import java.util.HashMap; +import java.util.Map; + +public class GetHttpParamUtil { + + public static Map getParams(String url) { + String[] paramArr = url.split("&"); + Map params = new HashMap(); + for (String s : paramArr) { + if (s.contains("=")) { + String[] keyValue = s.split("="); + params.put(keyValue[0], keyValue[1]); + } + } + return params; + } +}