From 5e30add660d3dce8cfe7102341465a236ad4afc4 Mon Sep 17 00:00:00 2001 From: republicline <1464474399@qq.com> Date: Fri, 15 Nov 2024 09:38:00 +0800 Subject: [PATCH] =?UTF-8?q?commit:=202=E6=9C=9F=E5=BF=83=E8=B7=B3=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=85=A8=E5=B1=80=E5=B0=81=E8=A3=85,=E8=AF=AD?= =?UTF-8?q?=E9=9F=B3=E8=81=8A=E5=A4=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{vital/common/util => }/DBNameTest.java | 22 ++-- .../java/com/rax/RaxAdminApplication.java | 1 + .../java/com/rax/admin/timmer/MongoTimer.java | 22 ++-- .../common/datasource/MongoDBSource.java | 12 +- .../com/rax/vital/v1/handler/ChatHandler.java | 3 +- .../rax/vital/v1/handler/MedicineHandler.java | 20 --- .../service/impl/VitalSignServiceImpl.java | 48 +++---- .../rax/vital/v1/timer/VitalSignTimer.java | 4 +- .../rax/vital/v1/timer/VitalSignTimerWS.java | 2 +- .../vital/v2/handler/AddMedicineHandler.java | 45 ++----- .../com/rax/vital/v2/handler/ChatHandler.java | 117 ++++++++++++------ .../v2/handler/MachineFeedbackHandler.java | 44 ++----- .../rax/vital/v2/handler/MedicineHandler.java | 42 ++----- .../service/impl/ChatServiceImpl.java | 27 +--- .../service/impl/MedicineService.java | 2 +- .../rax/vital/v2/timer/HeartBeatTimer.java | 88 +++++++++++++ .../rax/vital/v2/timer/VitalSignTimerV2.java | 19 +-- 17 files changed, 260 insertions(+), 258 deletions(-) rename upms/upms-biz/src/main/java/com/rax/{vital/common/util => }/DBNameTest.java (59%) create mode 100644 upms/upms-biz/src/main/java/com/rax/vital/v2/timer/HeartBeatTimer.java diff --git a/upms/upms-biz/src/main/java/com/rax/vital/common/util/DBNameTest.java b/upms/upms-biz/src/main/java/com/rax/DBNameTest.java similarity index 59% rename from upms/upms-biz/src/main/java/com/rax/vital/common/util/DBNameTest.java rename to upms/upms-biz/src/main/java/com/rax/DBNameTest.java index 687e16f..7ee47f2 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/common/util/DBNameTest.java +++ b/upms/upms-biz/src/main/java/com/rax/DBNameTest.java @@ -1,6 +1,8 @@ -package com.rax.vital.common.util; +package com.rax; +import com.rax.vital.common.util.DatabaseNameUtil; + import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -12,13 +14,19 @@ import java.util.Date; */ public class DBNameTest { public static void main(String[] args) { - String patientName = DatabaseNameUtil.encrypt("eee"); - String idNum = DatabaseNameUtil.encrypt("10"); - System.out.println("DBName = " + patientName + "_" + idNum); + //String patientName = DatabaseNameUtil.encrypt("eee"); + //String idNum = DatabaseNameUtil.encrypt("10"); + //System.out.println("DBName = " + patientName + "_" + idNum); + // + // + //String date = getDate(new Date(), 10); + //System.out.println("date = " + date); + // AAEb_Qw==_20240812 + // decrypt = rtr + // decrypt = 1 - - String date = getDate(new Date(), 10); - System.out.println("date = " + date); + String decrypt = DatabaseNameUtil.decrypt("Qw=="); + System.out.println("decrypt = " + decrypt); } public static String getDate(Date now, int days){ diff --git a/upms/upms-biz/src/main/java/com/rax/RaxAdminApplication.java b/upms/upms-biz/src/main/java/com/rax/RaxAdminApplication.java index 4add956..f48fcc8 100644 --- a/upms/upms-biz/src/main/java/com/rax/RaxAdminApplication.java +++ b/upms/upms-biz/src/main/java/com/rax/RaxAdminApplication.java @@ -4,6 +4,7 @@ import com.rax.common.security.annotation.EnableRaxResourceServer; import com.rax.common.swagger.annotation.EnableRaxDoc; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; import org.springframework.context.annotation.ComponentScan; diff --git a/upms/upms-biz/src/main/java/com/rax/admin/timmer/MongoTimer.java b/upms/upms-biz/src/main/java/com/rax/admin/timmer/MongoTimer.java index 811881f..2521c22 100644 --- a/upms/upms-biz/src/main/java/com/rax/admin/timmer/MongoTimer.java +++ b/upms/upms-biz/src/main/java/com/rax/admin/timmer/MongoTimer.java @@ -17,15 +17,15 @@ import org.springframework.stereotype.Component; @Slf4j @Component public class MongoTimer { - String connectionString = "mongodb://localhost:27017"; - MongoClient mongoClient = MongoClients.create(connectionString); - - - - public void backup() { - MongoIterable DbNames = mongoClient.listDatabaseNames(); - for (String dbName : DbNames) { - - } - } + //String connectionString = "mongodb://localhost:27017"; + //MongoClient mongoClient = MongoClients.create(connectionString); + // + // + // + //public void backup() { + // MongoIterable DbNames = mongoClient.listDatabaseNames(); + // for (String dbName : DbNames) { + // + // } + //} } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/common/datasource/MongoDBSource.java b/upms/upms-biz/src/main/java/com/rax/vital/common/datasource/MongoDBSource.java index 08fdc13..62b1d66 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/common/datasource/MongoDBSource.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/common/datasource/MongoDBSource.java @@ -22,10 +22,14 @@ public class MongoDBSource extends CustomDataSource { private SimpleMongoClientDatabaseFactory simpleMongoClientDatabaseFactory; - public MongoDBSource(String host, String password, String username, String database) { - this.host = host; - this.password = password; - this.username = username; + private static final String MONGO_CONNECTION_URL = + "mongodb://useradmin:Xg137839mg@110.41.142.124:27017/?directConnection=true"; + + + public MongoDBSource(String database) { + this.host = "110.41.142.124:27017"; + this.password = "Xg137839mg"; + this.username = "useradmin"; this.database = database; } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/ChatHandler.java b/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/ChatHandler.java index d8f611b..c1c9379 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/ChatHandler.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/ChatHandler.java @@ -220,7 +220,7 @@ public class ChatHandler implements WebSocketHandler { String createTableSQL = """ CREATE TABLE %s ( `id` int NOT NULL AUTO_INCREMENT, - `content` blob, + `content` longblob, `create_time` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `create_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `msg_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, @@ -240,7 +240,6 @@ public class ChatHandler implements WebSocketHandler { jsonObject.put("msgType", "msg"); session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); } - } catch (SQLException | IOException e) { throw new RuntimeException(e); } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/MedicineHandler.java b/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/MedicineHandler.java index 0669d47..e4515f3 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/MedicineHandler.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v1/handler/MedicineHandler.java @@ -74,26 +74,6 @@ public class MedicineHandler implements WebSocketHandler { @Scheduled(fixedRate = 30000) private void sendHeartbeat() { -// if (!timerTaskMap.containsKey(session.getId())) { -// ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1); -// heartbeatExecutor.scheduleAtFixedRate(() -> { -// try { -// if (session.isOpen()) { -// JSONObject jsonObject = new JSONObject(); -// jsonObject.put("msgType", "heartbeat"); -// session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); -// } else { -// session.close(); -// stopHeartbeat(session); -// vitalSignTimerWS.stopTimerTask(session.getId()); -// } -// } catch (Exception e) { -// e.printStackTrace(); -// stopHeartbeat(session); -// } -// }, 0, 10, TimeUnit.SECONDS); -// timerTaskMap.put(session.getId(), heartbeatExecutor); -// } for (WebSocketSession session : sessionMap.values()) { if (session.isOpen()) { try { diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v1/medicine/service/impl/VitalSignServiceImpl.java b/upms/upms-biz/src/main/java/com/rax/vital/v1/medicine/service/impl/VitalSignServiceImpl.java index 4cfd631..f2efaf2 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v1/medicine/service/impl/VitalSignServiceImpl.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v1/medicine/service/impl/VitalSignServiceImpl.java @@ -40,20 +40,6 @@ import java.util.concurrent.TimeUnit; @RefreshScope public class VitalSignServiceImpl implements VitalSignsService { - @Value("${vital-sign.mongodb.host}") - private String mongoDBHost; - - // MongoDB的用户名 - @Value("${vital-sign.mongodb.username}") - private String mongoUsername; - - // MongoDB的用户的密码 - @Value("${vital-sign.mongodb.password}") - private String mongoPassword; - - @Value("${vital-sign.except-database}") - private String exceptDatabase; - @Value("${vital-sign.information-database}") private String informationDatabase; @@ -193,7 +179,7 @@ public class VitalSignServiceImpl implements VitalSignsService { public R getPatientInfo(String databaseName) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, databaseName); + MongoDBSource mongoDBSource = new MongoDBSource(databaseName); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); Query query = new Query(); @@ -241,19 +227,19 @@ public class VitalSignServiceImpl implements VitalSignsService { @Override public R getDatabaseList() { - MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder(); - // mongodb://账户:密码@ip:端口/?authSource=admin - String connectionUrl = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoDBHost + "/" + "?authSource=admin"; - mongoBuilder.applyConnectionString(new ConnectionString(connectionUrl)); - MongoClient mongoClient = MongoClients.create(mongoBuilder.build(), SpringDataMongoDB.driverInformation()); - MongoIterable databaseNames = mongoClient.listDatabaseNames(); - for (String database : databaseNames) { - } + //MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder(); + //// mongodb://账户:密码@ip:端口/?authSource=admin + //String connectionUrl = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoDBHost + "/" + "?authSource=admin"; + //mongoBuilder.applyConnectionString(new ConnectionString(connectionUrl)); + //MongoClient mongoClient = MongoClients.create(mongoBuilder.build(), SpringDataMongoDB.driverInformation()); + //MongoIterable databaseNames = mongoClient.listDatabaseNames(); + //for (String database : databaseNames) { + //} return null; } public Page getPatientPage(String name, String dept, long offset, int limit) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); + MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); Query query = new Query(); @@ -283,7 +269,7 @@ public class VitalSignServiceImpl implements VitalSignsService { @Override public List getSurgeryCount(String start, String end) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); + MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); List operations = new ArrayList<>(); @@ -322,7 +308,7 @@ public class VitalSignServiceImpl implements VitalSignsService { } public List getSurgeryDuration(String start, String end) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); + MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); List operations = new ArrayList<>(); @@ -371,7 +357,7 @@ public class VitalSignServiceImpl implements VitalSignsService { } public List getSurgeryTypeProportion(String start, String end) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); + MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); List operations = new ArrayList<>(); @@ -391,7 +377,7 @@ public class VitalSignServiceImpl implements VitalSignsService { } public List getSurgeryOtherDuration(String start, String end) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); + MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); List operations = new ArrayList<>(); @@ -439,7 +425,7 @@ public class VitalSignServiceImpl implements VitalSignsService { @Override public List getPatientSurgeryList(String name, String code, String surgery, String type) { if (StringUtils.hasText(code)) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); + MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); @@ -489,7 +475,7 @@ public class VitalSignServiceImpl implements VitalSignsService { public List getSurgeryTableData(String name, String code, String date, String table) { if (StringUtils.hasText(code)) { if (!StringUtils.hasText(name)) { - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); + MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); Query query = new Query(); @@ -504,7 +490,7 @@ public class VitalSignServiceImpl implements VitalSignsService { mongoDBSource.close(); } String database = DatabaseNameUtil.encrypt(name) + "_" + DatabaseNameUtil.encrypt(code) + "_" + date; - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); + MongoDBSource mongoDBSource = new MongoDBSource(database); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); Query query = new Query(); diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimer.java b/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimer.java index 7626109..a85613c 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimer.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimer.java @@ -96,7 +96,7 @@ public class VitalSignTimer { MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId); if (mongoDBSource == null) { - mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); + mongoDBSource = new MongoDBSource(database); mongoDBSourceMap.put(simpSessionId, mongoDBSource); mongoDBSource.open(); } @@ -216,7 +216,7 @@ public class VitalSignTimer { if (masterControlMap.containsKey(database) && masterControlMap.get(database).equals(username)) { MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId); if (mongoDBSource == null) { - mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); + mongoDBSource = new MongoDBSource(database); mongoDBSourceMap.put(simpSessionId, mongoDBSource); mongoDBSource.open(); } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimerWS.java b/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimerWS.java index d962583..3692c7e 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimerWS.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v1/timer/VitalSignTimerWS.java @@ -99,7 +99,7 @@ public class VitalSignTimerWS { CustomDataSource dataSource = dataSourceMap.get(sessionId); if (dataSource == null) { - dataSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); + dataSource = new MongoDBSource(database); dataSourceMap.put(sessionId, dataSource); dataSource.open(); } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/AddMedicineHandler.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/AddMedicineHandler.java index 84bce89..1d6231e 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/AddMedicineHandler.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/AddMedicineHandler.java @@ -8,6 +8,7 @@ import com.rax.vital.common.util.GetHttpParamUtil; import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.common.util.TokenUtil; import com.rax.vital.v2.timer.AIMedicineTimer; +import com.rax.vital.v2.timer.HeartBeatTimer; import com.rax.vital.v2.timer.VitalSignTimerV2; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -40,16 +41,16 @@ public class AddMedicineHandler implements WebSocketHandler { @Autowired private AIMedicineTimer aiMedicineTimer; - // 发送心跳任务的定时任务容器 - private Map timerTaskMap = new ConcurrentHashMap<>(); + @Autowired + private HeartBeatTimer heartBeatTimer; + private String SERVICE_NAME = HeartBeatTimer.addMedicineHandler; @Override public void afterConnectionEstablished(WebSocketSession session) { - startHeartbeat(session); + heartBeatTimer.putSession(SERVICE_NAME,session); } - @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { @@ -124,7 +125,7 @@ public class AddMedicineHandler implements WebSocketHandler { @Override public void handleTransportError(WebSocketSession session, Throwable exception) { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); SysLog sysLog = sysLoggerBuilder.buildSysLog("给药接口异常中断:"+exception.getMessage(), "4", session); sysLogService.saveLog(sysLog); @@ -132,7 +133,8 @@ public class AddMedicineHandler implements WebSocketHandler { @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { - stopHeartbeat(session); + //stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); SysLog sysLog = sysLoggerBuilder.buildSysLog("给药接口客户端,断开连接", "0", session); sysLogService.saveLog(sysLog); @@ -143,33 +145,6 @@ public class AddMedicineHandler implements WebSocketHandler { return false; } - private void startHeartbeat(WebSocketSession session) { - if (!timerTaskMap.containsKey(session.getId())) { - ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1); - heartbeatExecutor.scheduleAtFixedRate(() -> { - try { - if (session.isOpen()) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("msgType", "heartbeat"); - session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); - } else { - stopHeartbeat(session); - aiMedicineTimer.closeConnection(session); - session.close(); - } - } catch (Exception e) { - e.printStackTrace(); - stopHeartbeat(session); - } - }, 0, 30, TimeUnit.SECONDS); - timerTaskMap.put(session.getId(), heartbeatExecutor); - } - } - - private void stopHeartbeat(WebSocketSession session) { - ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId()); - heartbeatExecutor.shutdownNow(); - } // 异步发送消息 private void sendMsgAsync(WebSocketSession session, JSONObject jsonObject) { @@ -178,12 +153,12 @@ public class AddMedicineHandler implements WebSocketHandler { if (session.isOpen()) { session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); }else { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); session.close(); } }catch (Exception e) { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); log.error("addMedicineHandler-sendMsgAsync error: {}", e.getMessage()); } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/ChatHandler.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/ChatHandler.java index 2748d84..6883f10 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/ChatHandler.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/ChatHandler.java @@ -1,23 +1,28 @@ package com.rax.vital.v2.handler; +import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSONObject; import com.rax.admin.api.entity.SysLog; import com.rax.admin.service.SysLogService; +import com.rax.vital.common.datasource.MongoDBSource; import com.rax.vital.common.util.DatabaseNameUtil; import com.rax.vital.common.util.GetHttpParamUtil; import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.common.util.TokenUtil; import com.rax.vital.v2.medicine.service.ChatService; +import com.rax.vital.v2.timer.HeartBeatTimer; +import org.bson.Document; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; + +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; @Component public class ChatHandler implements WebSocketHandler { @@ -34,11 +39,24 @@ public class ChatHandler implements WebSocketHandler { @Autowired private TokenUtil tokenUtil; - private Map timerTaskMap = new ConcurrentHashMap(); + @Autowired + private HeartBeatTimer heartBeatTimer; + + private String SERVICE_NAME = HeartBeatTimer.chatHandler; + + + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5); + + // dbName -> sessionList + private Map> dbNameSessionList = new ConcurrentHashMap(); + + // sessionId -> dbName + private Map sessionDbMap = new ConcurrentHashMap(); + @Override public void afterConnectionEstablished(WebSocketSession session) { - startHeartbeat(session); + heartBeatTimer.putSession(SERVICE_NAME, session); } @Override @@ -47,34 +65,66 @@ public class ChatHandler implements WebSocketHandler { String payload = (String) message.getPayload(); JSONObject jsonObject = JSONObject.parseObject(payload); - if (!"heartbeat".equals(jsonObject.getString("msgType"))) { + String msgType = jsonObject.getString("msgType"); + + String patientName = jsonObject.getString("patientName"); + String idNum = jsonObject.getString("idNum"); + String date = jsonObject.getString("date"); + String databaseName = DatabaseNameUtil.encrypt(patientName) + "_" + DatabaseNameUtil.encrypt(idNum) + "_" + date; + + + if ("init".equals(msgType)) { + if (!dbNameSessionList.containsKey(databaseName)) { + ArrayList sessionArrayList = new ArrayList<>(); + dbNameSessionList.put(databaseName, sessionArrayList); + sessionDbMap.put(session.getId(), databaseName); + } + List webSocketSessions = dbNameSessionList.get(databaseName); + webSocketSessions.add(session); + } + + // 处理音频或文本 + if ("msg".equals(msgType) || "audio".equals(msgType)) { String query = Objects.requireNonNull(session.getUri()).getQuery(); String token = GetHttpParamUtil.getParam(query, "token"); Map map = tokenUtil.parseToken(token); String username = (String) map.get("username"); - String patientName = jsonObject.getString("patientName"); - String idNum = jsonObject.getString("idNum"); - String date = jsonObject.getString("date"); // 消息内容 - String msg = jsonObject.getString("msg"); - String databaseName = DatabaseNameUtil.encrypt(patientName) + "_" + DatabaseNameUtil.encrypt(idNum) + "_" + date; - chatService.sendMessage(databaseName, username, session, msg); + String content = jsonObject.getString("content"); + List webSocketSessions = dbNameSessionList.get(databaseName); + // 转发消息 + JSONObject param = new JSONObject(); + param.put("msgType", msgType); + param.put("createUser", username); + param.put("createTime", DateUtil.now()); + param.put("content", content); + for (WebSocketSession webSocketSession : webSocketSessions) { + if (webSocketSession.isOpen()) { + System.out.println("发消息啦 = " + param.toJSONString()); + webSocketSession.sendMessage(new TextMessage(param.toJSONString().getBytes())); + } + } + + //异步入库mongodb + EXECUTOR_SERVICE.execute(() -> insertCollection(databaseName, param)); + // 之前的 + // chatService.sendMessage(databaseName, username, session, content); } } @Override public void handleTransportError(WebSocketSession session, Throwable exception) { - SysLog sysLog = sysLoggerBuilder.buildSysLog("聊天室接口异常中断:"+exception.getMessage(), "4", session); + SysLog sysLog = sysLoggerBuilder.buildSysLog("聊天室接口异常中断:" + exception.getMessage(), "4", session); sysLogService.saveLog(sysLog); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); chatService.stopTask(session.getId()); SysLog sysLog = sysLoggerBuilder.buildSysLog("聊天室接口关闭", "0", session); sysLogService.saveLog(sysLog); @@ -85,31 +135,18 @@ public class ChatHandler implements WebSocketHandler { return false; } - private void startHeartbeat(WebSocketSession session) { - if (!timerTaskMap.containsKey(session.getId())) { - ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1); - heartbeatExecutor.scheduleAtFixedRate(() -> { - try { - if (session.isOpen()) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("msgType", "heartbeat"); - session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); - } else { - stopHeartbeat(session); - chatService.stopTask(session.getId()); - session.close(); - } - } catch (Exception e) { - e.printStackTrace(); - stopHeartbeat(session); - } - }, 0, 10, TimeUnit.SECONDS); - timerTaskMap.put(session.getId(), heartbeatExecutor); - } + // 入库mongodb + private void insertCollection(String dbName, JSONObject jsonObject) { + MongoDBSource mongoDBSource = new MongoDBSource(dbName); + mongoDBSource.open(); + MongoTemplate mongoTemplate = mongoDBSource.getConnection(); + Document document = new Document(); + document.put("msgType", jsonObject.getString("msgType")); + document.put("createUser", jsonObject.getString("createUser")); + document.put("createTime", jsonObject.getString("createTime")); + document.put("content", jsonObject.getString("content")); + mongoTemplate.insert(jsonObject, "t_chat"); + mongoDBSource.close(); } - private void stopHeartbeat(WebSocketSession session) { - ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId()); - heartbeatExecutor.shutdownNow(); - } } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MachineFeedbackHandler.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MachineFeedbackHandler.java index c440292..c2a64a3 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MachineFeedbackHandler.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MachineFeedbackHandler.java @@ -6,6 +6,7 @@ import com.rax.admin.service.SysLogService; import com.rax.vital.common.util.DatabaseNameUtil; import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.v2.timer.AIMedicineTimer; +import com.rax.vital.v2.timer.HeartBeatTimer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -35,13 +36,19 @@ public class MachineFeedbackHandler implements WebSocketHandler { @Autowired private AIMedicineTimer aiMedicineTimer; + @Autowired + private HeartBeatTimer heartBeatTimer; + + private String SERVICE_NAME = HeartBeatTimer.machineFeedbackHandler; + + @Override public void afterConnectionEstablished(WebSocketSession session) throws IOException { JSONObject msg = new JSONObject(); msg.put("msgType", "msg"); msg.put("msg", "已成功连接服务器!"); session.sendMessage(new TextMessage(msg.toJSONString().getBytes())); - startHeartbeat(session); + heartBeatTimer.putSession(SERVICE_NAME,session); } @Override @@ -141,7 +148,7 @@ public class MachineFeedbackHandler implements WebSocketHandler { @Override public void handleTransportError(WebSocketSession session, Throwable exception) { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); SysLog sysLog = sysLoggerBuilder.buildSysLog("仪器端获取收药口异常中断", "4", session); sysLogService.saveLog(sysLog); @@ -149,7 +156,7 @@ public class MachineFeedbackHandler implements WebSocketHandler { @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); SysLog sysLog = sysLoggerBuilder.buildSysLog("仪器端收药接口关闭", "0", session); sysLogService.saveLog(sysLog); @@ -160,33 +167,6 @@ public class MachineFeedbackHandler implements WebSocketHandler { return false; } - private void startHeartbeat(WebSocketSession session) { - if (!timerTaskMap.containsKey(session.getId())) { - ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1); - heartbeatExecutor.scheduleAtFixedRate(() -> { - try { - if (session.isOpen()) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("msgType", "heartbeat"); - session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); - } else { - stopHeartbeat(session); - aiMedicineTimer.closeConnection(session); - session.close(); - } - } catch (Exception e) { - e.printStackTrace(); - stopHeartbeat(session); - } - }, 0, 30, TimeUnit.SECONDS); - timerTaskMap.put(session.getId(), heartbeatExecutor); - } - } - - private void stopHeartbeat(WebSocketSession session) { - ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId()); - heartbeatExecutor.shutdownNow(); - } // 异步发送消息 private void sendMsgAsync(WebSocketSession session, JSONObject jsonObject) { @@ -195,12 +175,12 @@ public class MachineFeedbackHandler implements WebSocketHandler { if (session.isOpen()) { session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); } else { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); session.close(); } } catch (Exception e) { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); aiMedicineTimer.closeConnection(session); log.error("addMedicineHandler-sendMsgAsync error: {}", e.getMessage()); } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MedicineHandler.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MedicineHandler.java index 62ccf18..52ac590 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MedicineHandler.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/handler/MedicineHandler.java @@ -7,6 +7,7 @@ import com.rax.vital.common.util.DatabaseNameUtil; import com.rax.vital.common.util.GetHttpParamUtil; import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.common.util.TokenUtil; +import com.rax.vital.v2.timer.HeartBeatTimer; import com.rax.vital.v2.timer.VitalSignTimerV2; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -39,11 +40,17 @@ public class MedicineHandler implements WebSocketHandler { @Autowired private TokenUtil tokenUtil; - private Map timerTaskMap = new ConcurrentHashMap<>(); + + // 全局心跳任务容器 + @Autowired + private HeartBeatTimer heartBeatTimer; + + private String SERVICE_NAME = HeartBeatTimer.medicineHandler; + @Override public void afterConnectionEstablished(WebSocketSession session) { - startHeartbeat(session); + heartBeatTimer.putSession(SERVICE_NAME,session); } @Override @@ -70,7 +77,7 @@ public class MedicineHandler implements WebSocketHandler { @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); vitalSignTimerV2.stopTimerTask(session.getId()); SysLog sysLog = sysLoggerBuilder.buildSysLog("生命体征接口异常中断", "4", session); sysLogService.saveLog(sysLog); @@ -78,7 +85,7 @@ public class MedicineHandler implements WebSocketHandler { @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { - stopHeartbeat(session); + heartBeatTimer.removeSession(session); vitalSignTimerV2.stopTimerTask(session.getId()); SysLog sysLog = sysLoggerBuilder.buildSysLog("生命体征接口连接关闭", "0", session); sysLogService.saveLog(sysLog); @@ -89,31 +96,4 @@ public class MedicineHandler implements WebSocketHandler { return false; } - private void startHeartbeat(WebSocketSession session) { - if (!timerTaskMap.containsKey(session.getId())) { - ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1); - heartbeatExecutor.scheduleAtFixedRate(() -> { - try { - if (session.isOpen()) { - JSONObject jsonObject = new JSONObject(); - jsonObject.put("msgType", "heartbeat"); - session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); - } else { - session.close(); - stopHeartbeat(session); - vitalSignTimerV2.stopTimerTask(session.getId()); - } - } catch (Exception e) { - e.printStackTrace(); - stopHeartbeat(session); - } - }, 0, 10, TimeUnit.SECONDS); - timerTaskMap.put(session.getId(), heartbeatExecutor); - } - } - - private void stopHeartbeat(WebSocketSession session) { - ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId()); - heartbeatExecutor.shutdownNow(); - } } diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/ChatServiceImpl.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/ChatServiceImpl.java index 14e0b8b..e54f28e 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/ChatServiceImpl.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/ChatServiceImpl.java @@ -37,36 +37,11 @@ public class ChatServiceImpl implements ChatService { private static final Map sessionDatabaseMap = new ConcurrentHashMap<>(); - // MongoDB的地址 - @Value("${vital-sign.mongodb.host}") - private String mongoDBHost; - - // MongoDB的用户名 - @Value("${vital-sign.mongodb.username}") - private String mongoUsername; - - // MongoDB的用户的密码 - @Value("${vital-sign.mongodb.password}") - private String mongoPassword; - - // mysql地址 - @Value("${vital-sign.mysql.host}") - private String mysqlHost; - - // mysql用户名 - @Value("${vital-sign.mysql.username}") - private String mysqlUsername; - - // mysql用户密码 - @Value("${vital-sign.mysql.password}") - private String mysqlPassword; - - @Override public void sendMessage(String databaseName, String username, WebSocketSession session, String msg) throws IOException { CustomDataSource mongoDBSource = datasourceMap.get(session.getId()); if (mongoDBSource == null) { - mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, databaseName); + mongoDBSource = new MongoDBSource(databaseName); mongoDBSource.open(); datasourceMap.put(session.getId(), mongoDBSource); sessionDatabaseMap.put(session.getId(), databaseName); diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/MedicineService.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/MedicineService.java index adf0017..31e2875 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/MedicineService.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/medicine/service/impl/MedicineService.java @@ -290,7 +290,7 @@ public class MedicineService implements IMedicineService { } } String database = DatabaseNameUtil.encrypt(name) + "_" + DatabaseNameUtil.encrypt(code) + "_" + date; - MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); + MongoDBSource mongoDBSource = new MongoDBSource(database); mongoDBSource.open(); MongoTemplate template = mongoDBSource.getConnection(); Query query = new Query(); diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/timer/HeartBeatTimer.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/timer/HeartBeatTimer.java new file mode 100644 index 0000000..181a4d4 --- /dev/null +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/timer/HeartBeatTimer.java @@ -0,0 +1,88 @@ +package com.rax.vital.v2.timer; + +import com.alibaba.fastjson.JSONObject; +import lombok.Data; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @project_name: rax-remote-v2 + * @time: 2024/11/13 17:01 + * @author: republicline + * @description: 全局心跳任务实现 + */ +@Component +@Data +public class HeartBeatTimer { + + + public static final String chatHandler = "chatHandler"; + public static final String addMedicineHandler = "addMedicineHandler"; + public static final String machineFeedbackHandler = "machineFeedbackHandler"; + public static final String medicineHandler = "medicineHandler"; + + + private static final Map> serviceSessionMap = new ConcurrentHashMap<>(); + + static { + // 初始化4个Map,分别对应4个不同业务模块的WebSocketSession + Map chatHeartBeat = new ConcurrentHashMap<>(); + Map addMedicineHeartBeat = new ConcurrentHashMap<>(); + Map machineFeedbackHeartBeat = new ConcurrentHashMap<>(); + Map medicineHeartBeat = new ConcurrentHashMap<>(); + serviceSessionMap.put(chatHandler, chatHeartBeat); + serviceSessionMap.put(machineFeedbackHandler, addMedicineHeartBeat); + serviceSessionMap.put(addMedicineHandler, machineFeedbackHeartBeat); + serviceSessionMap.put(medicineHandler, medicineHeartBeat); + } + + + @Scheduled(fixedRate = 30000) + private void sendHeartbeat() { + // 遍历urlSessionMap的session数量 调试使用 + serviceSessionMap.keySet().forEach(url -> { + Map handlerMap = serviceSessionMap.get(url); + for (WebSocketSession value : handlerMap.values()) { + System.out.println("url: " + url + " session: " + value.getId()); + } + }); + for (Map handlerMap : serviceSessionMap.values()) { + for (WebSocketSession session : handlerMap.values()) { + try { + if (session.isOpen()) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("msgType", "heartbeat"); + session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); + } else { + handlerMap.remove(session.getId()); + session.close(); + } + } catch (Exception e) { + handlerMap.remove(session.getId()); + e.printStackTrace(); + } + } + } + } + + + // 向指定业务模块的sessionMap中添加到urlSessionMap + public void putSession(String url, WebSocketSession session) { + Map handlerMap = serviceSessionMap.get(url); + if (handlerMap == null) { + return; + } + handlerMap.put(session.getId(), session); + } + + // 从指定业务模块的urlSessionMap移除session + public void removeSession(WebSocketSession session) { + serviceSessionMap.values().forEach(handlerMap -> handlerMap.remove(session.getId())); + } + +} diff --git a/upms/upms-biz/src/main/java/com/rax/vital/v2/timer/VitalSignTimerV2.java b/upms/upms-biz/src/main/java/com/rax/vital/v2/timer/VitalSignTimerV2.java index d855304..6be72da 100644 --- a/upms/upms-biz/src/main/java/com/rax/vital/v2/timer/VitalSignTimerV2.java +++ b/upms/upms-biz/src/main/java/com/rax/vital/v2/timer/VitalSignTimerV2.java @@ -31,17 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; @Component @RequiredArgsConstructor public class VitalSignTimerV2 { - // MongoDB的地址 - @Value("${vital-sign.mongodb.host}") - private String mongoDBHost; - - // MongoDB的用户名 - @Value("${vital-sign.mongodb.username}") - private String mongoUsername; - - // MongoDB的用户的密码 - @Value("${vital-sign.mongodb.password}") - private String mongoPassword; private final SurgeryServiceV2 surgeryServiceV2; @@ -77,7 +66,7 @@ public class VitalSignTimerV2 { CustomDataSource dataSource = dataSourceMap.get(sessionId); if (dataSource == null) { - dataSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); + dataSource = new MongoDBSource(database); dataSourceMap.put(sessionId, dataSource); dataSource.open(); } @@ -88,8 +77,8 @@ public class VitalSignTimerV2 { MongoTemplate template = finalMongoDBSource.getConnection(); JSONObject jsonObject = new JSONObject(); - // 生命体征信息 - List vitalSignsList = surgeryServiceV2.getVitalSignsList(template); + // 生命体征信息 这里要改改 + List vitalSignsList = surgeryServiceV2.getVitalSignsList(template); jsonObject.put("vitalSignsList", vitalSignsList); // 标记信息 @@ -161,7 +150,7 @@ public class VitalSignTimerV2 { // 定时任务,设置1秒 Timer timer = new Timer(); - timer.schedule(timerTask, 0, 2000); + timer.schedule(timerTask, 0, 1000); timerTaskMap.put(sessionId, timerTask); }