commit: 2期心跳任务全局封装,语音聊天

This commit is contained in:
republicline 2024-11-15 09:38:00 +08:00
parent 17be499d6d
commit 5e30add660
17 changed files with 260 additions and 258 deletions

View File

@ -1,6 +1,8 @@
package com.rax.vital.common.util; package com.rax;
import com.rax.vital.common.util.DatabaseNameUtil;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
@ -12,13 +14,19 @@ import java.util.Date;
*/ */
public class DBNameTest { public class DBNameTest {
public static void main(String[] args) { public static void main(String[] args) {
String patientName = DatabaseNameUtil.encrypt("eee"); //String patientName = DatabaseNameUtil.encrypt("eee");
String idNum = DatabaseNameUtil.encrypt("10"); //String idNum = DatabaseNameUtil.encrypt("10");
System.out.println("DBName = " + patientName + "_" + idNum); //System.out.println("DBName = " + patientName + "_" + idNum);
//
//
//String date = getDate(new Date(), 10);
//System.out.println("date = " + date);
// AAEb_Qw==_20240812
// decrypt = rtr
// decrypt = 1
String decrypt = DatabaseNameUtil.decrypt("Qw==");
String date = getDate(new Date(), 10); System.out.println("decrypt = " + decrypt);
System.out.println("date = " + date);
} }
public static String getDate(Date now, int days){ public static String getDate(Date now, int days){

View File

@ -4,6 +4,7 @@ import com.rax.common.security.annotation.EnableRaxResourceServer;
import com.rax.common.swagger.annotation.EnableRaxDoc; import com.rax.common.swagger.annotation.EnableRaxDoc;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;

View File

@ -17,15 +17,15 @@ import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@Component @Component
public class MongoTimer { public class MongoTimer {
String connectionString = "mongodb://localhost:27017"; //String connectionString = "mongodb://localhost:27017";
MongoClient mongoClient = MongoClients.create(connectionString); //MongoClient mongoClient = MongoClients.create(connectionString);
//
//
//
public void backup() { //public void backup() {
MongoIterable<String> DbNames = mongoClient.listDatabaseNames(); // MongoIterable<String> DbNames = mongoClient.listDatabaseNames();
for (String dbName : DbNames) { // for (String dbName : DbNames) {
//
} // }
} //}
} }

View File

@ -22,10 +22,14 @@ public class MongoDBSource extends CustomDataSource {
private SimpleMongoClientDatabaseFactory simpleMongoClientDatabaseFactory; private SimpleMongoClientDatabaseFactory simpleMongoClientDatabaseFactory;
public MongoDBSource(String host, String password, String username, String database) { private static final String MONGO_CONNECTION_URL =
this.host = host; "mongodb://useradmin:Xg137839mg@110.41.142.124:27017/?directConnection=true";
this.password = password;
this.username = username;
public MongoDBSource(String database) {
this.host = "110.41.142.124:27017";
this.password = "Xg137839mg";
this.username = "useradmin";
this.database = database; this.database = database;
} }

View File

@ -220,7 +220,7 @@ public class ChatHandler implements WebSocketHandler {
String createTableSQL = """ String createTableSQL = """
CREATE TABLE %s ( CREATE TABLE %s (
`id` int NOT NULL AUTO_INCREMENT, `id` int NOT NULL AUTO_INCREMENT,
`content` blob, `content` longblob,
`create_time` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `create_time` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`create_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `create_user` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`msg_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `msg_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
@ -240,7 +240,6 @@ public class ChatHandler implements WebSocketHandler {
jsonObject.put("msgType", "msg"); jsonObject.put("msgType", "msg");
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
} }
} catch (SQLException | IOException e) { } catch (SQLException | IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -74,26 +74,6 @@ public class MedicineHandler implements WebSocketHandler {
@Scheduled(fixedRate = 30000) @Scheduled(fixedRate = 30000)
private void sendHeartbeat() { private void sendHeartbeat() {
// if (!timerTaskMap.containsKey(session.getId())) {
// ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1);
// heartbeatExecutor.scheduleAtFixedRate(() -> {
// try {
// if (session.isOpen()) {
// JSONObject jsonObject = new JSONObject();
// jsonObject.put("msgType", "heartbeat");
// session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
// } else {
// session.close();
// stopHeartbeat(session);
// vitalSignTimerWS.stopTimerTask(session.getId());
// }
// } catch (Exception e) {
// e.printStackTrace();
// stopHeartbeat(session);
// }
// }, 0, 10, TimeUnit.SECONDS);
// timerTaskMap.put(session.getId(), heartbeatExecutor);
// }
for (WebSocketSession session : sessionMap.values()) { for (WebSocketSession session : sessionMap.values()) {
if (session.isOpen()) { if (session.isOpen()) {
try { try {

View File

@ -40,20 +40,6 @@ import java.util.concurrent.TimeUnit;
@RefreshScope @RefreshScope
public class VitalSignServiceImpl implements VitalSignsService { public class VitalSignServiceImpl implements VitalSignsService {
@Value("${vital-sign.mongodb.host}")
private String mongoDBHost;
// MongoDB的用户名
@Value("${vital-sign.mongodb.username}")
private String mongoUsername;
// MongoDB的用户的密码
@Value("${vital-sign.mongodb.password}")
private String mongoPassword;
@Value("${vital-sign.except-database}")
private String exceptDatabase;
@Value("${vital-sign.information-database}") @Value("${vital-sign.information-database}")
private String informationDatabase; private String informationDatabase;
@ -193,7 +179,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
public R getPatientInfo(String databaseName) { public R getPatientInfo(String databaseName) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, databaseName); MongoDBSource mongoDBSource = new MongoDBSource(databaseName);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
Query query = new Query(); Query query = new Query();
@ -241,19 +227,19 @@ public class VitalSignServiceImpl implements VitalSignsService {
@Override @Override
public R getDatabaseList() { public R getDatabaseList() {
MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder(); //MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder();
// mongodb://账户:密码@ip:端口/?authSource=admin //// mongodb://账户:密码@ip:端口/?authSource=admin
String connectionUrl = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoDBHost + "/" + "?authSource=admin"; //String connectionUrl = "mongodb://" + mongoUsername + ":" + mongoPassword + "@" + mongoDBHost + "/" + "?authSource=admin";
mongoBuilder.applyConnectionString(new ConnectionString(connectionUrl)); //mongoBuilder.applyConnectionString(new ConnectionString(connectionUrl));
MongoClient mongoClient = MongoClients.create(mongoBuilder.build(), SpringDataMongoDB.driverInformation()); //MongoClient mongoClient = MongoClients.create(mongoBuilder.build(), SpringDataMongoDB.driverInformation());
MongoIterable<String> databaseNames = mongoClient.listDatabaseNames(); //MongoIterable<String> databaseNames = mongoClient.listDatabaseNames();
for (String database : databaseNames) { //for (String database : databaseNames) {
} //}
return null; return null;
} }
public Page getPatientPage(String name, String dept, long offset, int limit) { public Page getPatientPage(String name, String dept, long offset, int limit) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
Query query = new Query(); Query query = new Query();
@ -283,7 +269,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
@Override @Override
public List getSurgeryCount(String start, String end) { public List getSurgeryCount(String start, String end) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
List<AggregationOperation> operations = new ArrayList<>(); List<AggregationOperation> operations = new ArrayList<>();
@ -322,7 +308,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
} }
public List getSurgeryDuration(String start, String end) { public List getSurgeryDuration(String start, String end) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
List<AggregationOperation> operations = new ArrayList<>(); List<AggregationOperation> operations = new ArrayList<>();
@ -371,7 +357,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
} }
public List getSurgeryTypeProportion(String start, String end) { public List getSurgeryTypeProportion(String start, String end) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
List<AggregationOperation> operations = new ArrayList<>(); List<AggregationOperation> operations = new ArrayList<>();
@ -391,7 +377,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
} }
public List getSurgeryOtherDuration(String start, String end) { public List getSurgeryOtherDuration(String start, String end) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
List<AggregationOperation> operations = new ArrayList<>(); List<AggregationOperation> operations = new ArrayList<>();
@ -439,7 +425,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
@Override @Override
public List getPatientSurgeryList(String name, String code, String surgery, String type) { public List getPatientSurgeryList(String name, String code, String surgery, String type) {
if (StringUtils.hasText(code)) { if (StringUtils.hasText(code)) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
@ -489,7 +475,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
public List getSurgeryTableData(String name, String code, String date, String table) { public List getSurgeryTableData(String name, String code, String date, String table) {
if (StringUtils.hasText(code)) { if (StringUtils.hasText(code)) {
if (!StringUtils.hasText(name)) { if (!StringUtils.hasText(name)) {
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, informationDatabase); MongoDBSource mongoDBSource = new MongoDBSource(informationDatabase);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
Query query = new Query(); Query query = new Query();
@ -504,7 +490,7 @@ public class VitalSignServiceImpl implements VitalSignsService {
mongoDBSource.close(); mongoDBSource.close();
} }
String database = DatabaseNameUtil.encrypt(name) + "_" + DatabaseNameUtil.encrypt(code) + "_" + date; String database = DatabaseNameUtil.encrypt(name) + "_" + DatabaseNameUtil.encrypt(code) + "_" + date;
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); MongoDBSource mongoDBSource = new MongoDBSource(database);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
Query query = new Query(); Query query = new Query();

View File

@ -96,7 +96,7 @@ public class VitalSignTimer {
MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId); MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId);
if (mongoDBSource == null) { if (mongoDBSource == null) {
mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); mongoDBSource = new MongoDBSource(database);
mongoDBSourceMap.put(simpSessionId, mongoDBSource); mongoDBSourceMap.put(simpSessionId, mongoDBSource);
mongoDBSource.open(); mongoDBSource.open();
} }
@ -216,7 +216,7 @@ public class VitalSignTimer {
if (masterControlMap.containsKey(database) && masterControlMap.get(database).equals(username)) { if (masterControlMap.containsKey(database) && masterControlMap.get(database).equals(username)) {
MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId); MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId);
if (mongoDBSource == null) { if (mongoDBSource == null) {
mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); mongoDBSource = new MongoDBSource(database);
mongoDBSourceMap.put(simpSessionId, mongoDBSource); mongoDBSourceMap.put(simpSessionId, mongoDBSource);
mongoDBSource.open(); mongoDBSource.open();
} }

View File

@ -99,7 +99,7 @@ public class VitalSignTimerWS {
CustomDataSource dataSource = dataSourceMap.get(sessionId); CustomDataSource dataSource = dataSourceMap.get(sessionId);
if (dataSource == null) { if (dataSource == null) {
dataSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); dataSource = new MongoDBSource(database);
dataSourceMap.put(sessionId, dataSource); dataSourceMap.put(sessionId, dataSource);
dataSource.open(); dataSource.open();
} }

View File

@ -8,6 +8,7 @@ import com.rax.vital.common.util.GetHttpParamUtil;
import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.common.util.SysLoggerBuilder;
import com.rax.vital.common.util.TokenUtil; import com.rax.vital.common.util.TokenUtil;
import com.rax.vital.v2.timer.AIMedicineTimer; import com.rax.vital.v2.timer.AIMedicineTimer;
import com.rax.vital.v2.timer.HeartBeatTimer;
import com.rax.vital.v2.timer.VitalSignTimerV2; import com.rax.vital.v2.timer.VitalSignTimerV2;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -40,16 +41,16 @@ public class AddMedicineHandler implements WebSocketHandler {
@Autowired @Autowired
private AIMedicineTimer aiMedicineTimer; private AIMedicineTimer aiMedicineTimer;
// 发送心跳任务的定时任务容器 @Autowired
private Map<String, ScheduledExecutorService> timerTaskMap = new ConcurrentHashMap<>(); private HeartBeatTimer heartBeatTimer;
private String SERVICE_NAME = HeartBeatTimer.addMedicineHandler;
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) { public void afterConnectionEstablished(WebSocketSession session) {
startHeartbeat(session); heartBeatTimer.putSession(SERVICE_NAME,session);
} }
@Override @Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
@ -124,7 +125,7 @@ public class AddMedicineHandler implements WebSocketHandler {
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) { public void handleTransportError(WebSocketSession session, Throwable exception) {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
SysLog sysLog = sysLoggerBuilder.buildSysLog("给药接口异常中断:"+exception.getMessage(), "4", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("给药接口异常中断:"+exception.getMessage(), "4", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
@ -132,7 +133,8 @@ public class AddMedicineHandler implements WebSocketHandler {
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
stopHeartbeat(session); //stopHeartbeat(session);
heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
SysLog sysLog = sysLoggerBuilder.buildSysLog("给药接口客户端,断开连接", "0", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("给药接口客户端,断开连接", "0", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
@ -143,33 +145,6 @@ public class AddMedicineHandler implements WebSocketHandler {
return false; return false;
} }
private void startHeartbeat(WebSocketSession session) {
if (!timerTaskMap.containsKey(session.getId())) {
ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1);
heartbeatExecutor.scheduleAtFixedRate(() -> {
try {
if (session.isOpen()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msgType", "heartbeat");
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
} else {
stopHeartbeat(session);
aiMedicineTimer.closeConnection(session);
session.close();
}
} catch (Exception e) {
e.printStackTrace();
stopHeartbeat(session);
}
}, 0, 30, TimeUnit.SECONDS);
timerTaskMap.put(session.getId(), heartbeatExecutor);
}
}
private void stopHeartbeat(WebSocketSession session) {
ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId());
heartbeatExecutor.shutdownNow();
}
// 异步发送消息 // 异步发送消息
private void sendMsgAsync(WebSocketSession session, JSONObject jsonObject) { private void sendMsgAsync(WebSocketSession session, JSONObject jsonObject) {
@ -178,12 +153,12 @@ public class AddMedicineHandler implements WebSocketHandler {
if (session.isOpen()) { if (session.isOpen()) {
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
}else { }else {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
session.close(); session.close();
} }
}catch (Exception e) { }catch (Exception e) {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
log.error("addMedicineHandler-sendMsgAsync error: {}", e.getMessage()); log.error("addMedicineHandler-sendMsgAsync error: {}", e.getMessage());
} }

View File

@ -1,23 +1,28 @@
package com.rax.vital.v2.handler; package com.rax.vital.v2.handler;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.rax.admin.api.entity.SysLog; import com.rax.admin.api.entity.SysLog;
import com.rax.admin.service.SysLogService; import com.rax.admin.service.SysLogService;
import com.rax.vital.common.datasource.MongoDBSource;
import com.rax.vital.common.util.DatabaseNameUtil; import com.rax.vital.common.util.DatabaseNameUtil;
import com.rax.vital.common.util.GetHttpParamUtil; import com.rax.vital.common.util.GetHttpParamUtil;
import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.common.util.SysLoggerBuilder;
import com.rax.vital.common.util.TokenUtil; import com.rax.vital.common.util.TokenUtil;
import com.rax.vital.v2.medicine.service.ChatService; import com.rax.vital.v2.medicine.service.ChatService;
import com.rax.vital.v2.timer.HeartBeatTimer;
import org.bson.Document;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.socket.*; import org.springframework.web.socket.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component @Component
public class ChatHandler implements WebSocketHandler { public class ChatHandler implements WebSocketHandler {
@ -34,11 +39,24 @@ public class ChatHandler implements WebSocketHandler {
@Autowired @Autowired
private TokenUtil tokenUtil; private TokenUtil tokenUtil;
private Map<String, ScheduledExecutorService> timerTaskMap = new ConcurrentHashMap(); @Autowired
private HeartBeatTimer heartBeatTimer;
private String SERVICE_NAME = HeartBeatTimer.chatHandler;
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(5);
// dbName -> sessionList
private Map<String, List<WebSocketSession>> dbNameSessionList = new ConcurrentHashMap();
// sessionId -> dbName
private Map<String, String> sessionDbMap = new ConcurrentHashMap();
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) { public void afterConnectionEstablished(WebSocketSession session) {
startHeartbeat(session); heartBeatTimer.putSession(SERVICE_NAME, session);
} }
@Override @Override
@ -47,34 +65,66 @@ public class ChatHandler implements WebSocketHandler {
String payload = (String) message.getPayload(); String payload = (String) message.getPayload();
JSONObject jsonObject = JSONObject.parseObject(payload); JSONObject jsonObject = JSONObject.parseObject(payload);
if (!"heartbeat".equals(jsonObject.getString("msgType"))) {
String msgType = jsonObject.getString("msgType");
String patientName = jsonObject.getString("patientName");
String idNum = jsonObject.getString("idNum");
String date = jsonObject.getString("date");
String databaseName = DatabaseNameUtil.encrypt(patientName) + "_" + DatabaseNameUtil.encrypt(idNum) + "_" + date;
if ("init".equals(msgType)) {
if (!dbNameSessionList.containsKey(databaseName)) {
ArrayList<WebSocketSession> sessionArrayList = new ArrayList<>();
dbNameSessionList.put(databaseName, sessionArrayList);
sessionDbMap.put(session.getId(), databaseName);
}
List<WebSocketSession> webSocketSessions = dbNameSessionList.get(databaseName);
webSocketSessions.add(session);
}
// 处理音频或文本
if ("msg".equals(msgType) || "audio".equals(msgType)) {
String query = Objects.requireNonNull(session.getUri()).getQuery(); String query = Objects.requireNonNull(session.getUri()).getQuery();
String token = GetHttpParamUtil.getParam(query, "token"); String token = GetHttpParamUtil.getParam(query, "token");
Map<String, Object> map = tokenUtil.parseToken(token); Map<String, Object> map = tokenUtil.parseToken(token);
String username = (String) map.get("username"); String username = (String) map.get("username");
String patientName = jsonObject.getString("patientName");
String idNum = jsonObject.getString("idNum");
String date = jsonObject.getString("date");
// 消息内容 // 消息内容
String msg = jsonObject.getString("msg"); String content = jsonObject.getString("content");
String databaseName = DatabaseNameUtil.encrypt(patientName) + "_" + DatabaseNameUtil.encrypt(idNum) + "_" + date; List<WebSocketSession> webSocketSessions = dbNameSessionList.get(databaseName);
chatService.sendMessage(databaseName, username, session, msg); // 转发消息
JSONObject param = new JSONObject();
param.put("msgType", msgType);
param.put("createUser", username);
param.put("createTime", DateUtil.now());
param.put("content", content);
for (WebSocketSession webSocketSession : webSocketSessions) {
if (webSocketSession.isOpen()) {
System.out.println("发消息啦 = " + param.toJSONString());
webSocketSession.sendMessage(new TextMessage(param.toJSONString().getBytes()));
}
} }
//异步入库mongodb
EXECUTOR_SERVICE.execute(() -> insertCollection(databaseName, param));
// 之前的
// chatService.sendMessage(databaseName, username, session, content);
}
} }
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) { public void handleTransportError(WebSocketSession session, Throwable exception) {
SysLog sysLog = sysLoggerBuilder.buildSysLog("聊天室接口异常中断:"+exception.getMessage(), "4", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("聊天室接口异常中断:" + exception.getMessage(), "4", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
} }
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
chatService.stopTask(session.getId()); chatService.stopTask(session.getId());
SysLog sysLog = sysLoggerBuilder.buildSysLog("聊天室接口关闭", "0", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("聊天室接口关闭", "0", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
@ -85,31 +135,18 @@ public class ChatHandler implements WebSocketHandler {
return false; return false;
} }
private void startHeartbeat(WebSocketSession session) { // 入库mongodb
if (!timerTaskMap.containsKey(session.getId())) { private void insertCollection(String dbName, JSONObject jsonObject) {
ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1); MongoDBSource mongoDBSource = new MongoDBSource(dbName);
heartbeatExecutor.scheduleAtFixedRate(() -> { mongoDBSource.open();
try { MongoTemplate mongoTemplate = mongoDBSource.getConnection();
if (session.isOpen()) { Document document = new Document();
JSONObject jsonObject = new JSONObject(); document.put("msgType", jsonObject.getString("msgType"));
jsonObject.put("msgType", "heartbeat"); document.put("createUser", jsonObject.getString("createUser"));
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); document.put("createTime", jsonObject.getString("createTime"));
} else { document.put("content", jsonObject.getString("content"));
stopHeartbeat(session); mongoTemplate.insert(jsonObject, "t_chat");
chatService.stopTask(session.getId()); mongoDBSource.close();
session.close();
}
} catch (Exception e) {
e.printStackTrace();
stopHeartbeat(session);
}
}, 0, 10, TimeUnit.SECONDS);
timerTaskMap.put(session.getId(), heartbeatExecutor);
}
} }
private void stopHeartbeat(WebSocketSession session) {
ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId());
heartbeatExecutor.shutdownNow();
}
} }

View File

@ -6,6 +6,7 @@ import com.rax.admin.service.SysLogService;
import com.rax.vital.common.util.DatabaseNameUtil; import com.rax.vital.common.util.DatabaseNameUtil;
import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.common.util.SysLoggerBuilder;
import com.rax.vital.v2.timer.AIMedicineTimer; import com.rax.vital.v2.timer.AIMedicineTimer;
import com.rax.vital.v2.timer.HeartBeatTimer;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -35,13 +36,19 @@ public class MachineFeedbackHandler implements WebSocketHandler {
@Autowired @Autowired
private AIMedicineTimer aiMedicineTimer; private AIMedicineTimer aiMedicineTimer;
@Autowired
private HeartBeatTimer heartBeatTimer;
private String SERVICE_NAME = HeartBeatTimer.machineFeedbackHandler;
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws IOException { public void afterConnectionEstablished(WebSocketSession session) throws IOException {
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put("msgType", "msg"); msg.put("msgType", "msg");
msg.put("msg", "已成功连接服务器!"); msg.put("msg", "已成功连接服务器!");
session.sendMessage(new TextMessage(msg.toJSONString().getBytes())); session.sendMessage(new TextMessage(msg.toJSONString().getBytes()));
startHeartbeat(session); heartBeatTimer.putSession(SERVICE_NAME,session);
} }
@Override @Override
@ -141,7 +148,7 @@ public class MachineFeedbackHandler implements WebSocketHandler {
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) { public void handleTransportError(WebSocketSession session, Throwable exception) {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
SysLog sysLog = sysLoggerBuilder.buildSysLog("仪器端获取收药口异常中断", "4", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("仪器端获取收药口异常中断", "4", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
@ -149,7 +156,7 @@ public class MachineFeedbackHandler implements WebSocketHandler {
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
SysLog sysLog = sysLoggerBuilder.buildSysLog("仪器端收药接口关闭", "0", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("仪器端收药接口关闭", "0", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
@ -160,33 +167,6 @@ public class MachineFeedbackHandler implements WebSocketHandler {
return false; return false;
} }
private void startHeartbeat(WebSocketSession session) {
if (!timerTaskMap.containsKey(session.getId())) {
ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1);
heartbeatExecutor.scheduleAtFixedRate(() -> {
try {
if (session.isOpen()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msgType", "heartbeat");
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
} else {
stopHeartbeat(session);
aiMedicineTimer.closeConnection(session);
session.close();
}
} catch (Exception e) {
e.printStackTrace();
stopHeartbeat(session);
}
}, 0, 30, TimeUnit.SECONDS);
timerTaskMap.put(session.getId(), heartbeatExecutor);
}
}
private void stopHeartbeat(WebSocketSession session) {
ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId());
heartbeatExecutor.shutdownNow();
}
// 异步发送消息 // 异步发送消息
private void sendMsgAsync(WebSocketSession session, JSONObject jsonObject) { private void sendMsgAsync(WebSocketSession session, JSONObject jsonObject) {
@ -195,12 +175,12 @@ public class MachineFeedbackHandler implements WebSocketHandler {
if (session.isOpen()) { if (session.isOpen()) {
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes())); session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
} else { } else {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
session.close(); session.close();
} }
} catch (Exception e) { } catch (Exception e) {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
aiMedicineTimer.closeConnection(session); aiMedicineTimer.closeConnection(session);
log.error("addMedicineHandler-sendMsgAsync error: {}", e.getMessage()); log.error("addMedicineHandler-sendMsgAsync error: {}", e.getMessage());
} }

View File

@ -7,6 +7,7 @@ import com.rax.vital.common.util.DatabaseNameUtil;
import com.rax.vital.common.util.GetHttpParamUtil; import com.rax.vital.common.util.GetHttpParamUtil;
import com.rax.vital.common.util.SysLoggerBuilder; import com.rax.vital.common.util.SysLoggerBuilder;
import com.rax.vital.common.util.TokenUtil; import com.rax.vital.common.util.TokenUtil;
import com.rax.vital.v2.timer.HeartBeatTimer;
import com.rax.vital.v2.timer.VitalSignTimerV2; import com.rax.vital.v2.timer.VitalSignTimerV2;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -39,11 +40,17 @@ public class MedicineHandler implements WebSocketHandler {
@Autowired @Autowired
private TokenUtil tokenUtil; private TokenUtil tokenUtil;
private Map<String, ScheduledExecutorService> timerTaskMap = new ConcurrentHashMap<>();
// 全局心跳任务容器
@Autowired
private HeartBeatTimer heartBeatTimer;
private String SERVICE_NAME = HeartBeatTimer.medicineHandler;
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) { public void afterConnectionEstablished(WebSocketSession session) {
startHeartbeat(session); heartBeatTimer.putSession(SERVICE_NAME,session);
} }
@Override @Override
@ -70,7 +77,7 @@ public class MedicineHandler implements WebSocketHandler {
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
vitalSignTimerV2.stopTimerTask(session.getId()); vitalSignTimerV2.stopTimerTask(session.getId());
SysLog sysLog = sysLoggerBuilder.buildSysLog("生命体征接口异常中断", "4", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("生命体征接口异常中断", "4", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
@ -78,7 +85,7 @@ public class MedicineHandler implements WebSocketHandler {
@Override @Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
stopHeartbeat(session); heartBeatTimer.removeSession(session);
vitalSignTimerV2.stopTimerTask(session.getId()); vitalSignTimerV2.stopTimerTask(session.getId());
SysLog sysLog = sysLoggerBuilder.buildSysLog("生命体征接口连接关闭", "0", session); SysLog sysLog = sysLoggerBuilder.buildSysLog("生命体征接口连接关闭", "0", session);
sysLogService.saveLog(sysLog); sysLogService.saveLog(sysLog);
@ -89,31 +96,4 @@ public class MedicineHandler implements WebSocketHandler {
return false; return false;
} }
private void startHeartbeat(WebSocketSession session) {
if (!timerTaskMap.containsKey(session.getId())) {
ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1);
heartbeatExecutor.scheduleAtFixedRate(() -> {
try {
if (session.isOpen()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msgType", "heartbeat");
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
} else {
session.close();
stopHeartbeat(session);
vitalSignTimerV2.stopTimerTask(session.getId());
}
} catch (Exception e) {
e.printStackTrace();
stopHeartbeat(session);
}
}, 0, 10, TimeUnit.SECONDS);
timerTaskMap.put(session.getId(), heartbeatExecutor);
}
}
private void stopHeartbeat(WebSocketSession session) {
ScheduledExecutorService heartbeatExecutor = timerTaskMap.get(session.getId());
heartbeatExecutor.shutdownNow();
}
} }

View File

@ -37,36 +37,11 @@ public class ChatServiceImpl implements ChatService {
private static final Map<String, String> sessionDatabaseMap = new ConcurrentHashMap<>(); private static final Map<String, String> sessionDatabaseMap = new ConcurrentHashMap<>();
// MongoDB的地址
@Value("${vital-sign.mongodb.host}")
private String mongoDBHost;
// MongoDB的用户名
@Value("${vital-sign.mongodb.username}")
private String mongoUsername;
// MongoDB的用户的密码
@Value("${vital-sign.mongodb.password}")
private String mongoPassword;
// mysql地址
@Value("${vital-sign.mysql.host}")
private String mysqlHost;
// mysql用户名
@Value("${vital-sign.mysql.username}")
private String mysqlUsername;
// mysql用户密码
@Value("${vital-sign.mysql.password}")
private String mysqlPassword;
@Override @Override
public void sendMessage(String databaseName, String username, WebSocketSession session, String msg) throws IOException { public void sendMessage(String databaseName, String username, WebSocketSession session, String msg) throws IOException {
CustomDataSource mongoDBSource = datasourceMap.get(session.getId()); CustomDataSource mongoDBSource = datasourceMap.get(session.getId());
if (mongoDBSource == null) { if (mongoDBSource == null) {
mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, databaseName); mongoDBSource = new MongoDBSource(databaseName);
mongoDBSource.open(); mongoDBSource.open();
datasourceMap.put(session.getId(), mongoDBSource); datasourceMap.put(session.getId(), mongoDBSource);
sessionDatabaseMap.put(session.getId(), databaseName); sessionDatabaseMap.put(session.getId(), databaseName);

View File

@ -290,7 +290,7 @@ public class MedicineService implements IMedicineService {
} }
} }
String database = DatabaseNameUtil.encrypt(name) + "_" + DatabaseNameUtil.encrypt(code) + "_" + date; String database = DatabaseNameUtil.encrypt(name) + "_" + DatabaseNameUtil.encrypt(code) + "_" + date;
MongoDBSource mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); MongoDBSource mongoDBSource = new MongoDBSource(database);
mongoDBSource.open(); mongoDBSource.open();
MongoTemplate template = mongoDBSource.getConnection(); MongoTemplate template = mongoDBSource.getConnection();
Query query = new Query(); Query query = new Query();

View File

@ -0,0 +1,88 @@
package com.rax.vital.v2.timer;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @project_name: rax-remote-v2
* @time: 2024/11/13 17:01
* @author: republicline
* @description: 全局心跳任务实现
*/
@Component
@Data
public class HeartBeatTimer {
public static final String chatHandler = "chatHandler";
public static final String addMedicineHandler = "addMedicineHandler";
public static final String machineFeedbackHandler = "machineFeedbackHandler";
public static final String medicineHandler = "medicineHandler";
private static final Map<String, Map<String, WebSocketSession>> serviceSessionMap = new ConcurrentHashMap<>();
static {
// 初始化4个Map分别对应4个不同业务模块的WebSocketSession
Map<String, WebSocketSession> chatHeartBeat = new ConcurrentHashMap<>();
Map<String, WebSocketSession> addMedicineHeartBeat = new ConcurrentHashMap<>();
Map<String, WebSocketSession> machineFeedbackHeartBeat = new ConcurrentHashMap<>();
Map<String, WebSocketSession> medicineHeartBeat = new ConcurrentHashMap<>();
serviceSessionMap.put(chatHandler, chatHeartBeat);
serviceSessionMap.put(machineFeedbackHandler, addMedicineHeartBeat);
serviceSessionMap.put(addMedicineHandler, machineFeedbackHeartBeat);
serviceSessionMap.put(medicineHandler, medicineHeartBeat);
}
@Scheduled(fixedRate = 30000)
private void sendHeartbeat() {
// 遍历urlSessionMap的session数量 调试使用
serviceSessionMap.keySet().forEach(url -> {
Map<String, WebSocketSession> handlerMap = serviceSessionMap.get(url);
for (WebSocketSession value : handlerMap.values()) {
System.out.println("url: " + url + " session: " + value.getId());
}
});
for (Map<String, WebSocketSession> handlerMap : serviceSessionMap.values()) {
for (WebSocketSession session : handlerMap.values()) {
try {
if (session.isOpen()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("msgType", "heartbeat");
session.sendMessage(new TextMessage(jsonObject.toJSONString().getBytes()));
} else {
handlerMap.remove(session.getId());
session.close();
}
} catch (Exception e) {
handlerMap.remove(session.getId());
e.printStackTrace();
}
}
}
}
// 向指定业务模块的sessionMap中添加到urlSessionMap
public void putSession(String url, WebSocketSession session) {
Map<String, WebSocketSession> handlerMap = serviceSessionMap.get(url);
if (handlerMap == null) {
return;
}
handlerMap.put(session.getId(), session);
}
// 从指定业务模块的urlSessionMap移除session
public void removeSession(WebSocketSession session) {
serviceSessionMap.values().forEach(handlerMap -> handlerMap.remove(session.getId()));
}
}

View File

@ -31,17 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class VitalSignTimerV2 { public class VitalSignTimerV2 {
// MongoDB的地址
@Value("${vital-sign.mongodb.host}")
private String mongoDBHost;
// MongoDB的用户名
@Value("${vital-sign.mongodb.username}")
private String mongoUsername;
// MongoDB的用户的密码
@Value("${vital-sign.mongodb.password}")
private String mongoPassword;
private final SurgeryServiceV2 surgeryServiceV2; private final SurgeryServiceV2 surgeryServiceV2;
@ -77,7 +66,7 @@ public class VitalSignTimerV2 {
CustomDataSource dataSource = dataSourceMap.get(sessionId); CustomDataSource dataSource = dataSourceMap.get(sessionId);
if (dataSource == null) { if (dataSource == null) {
dataSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); dataSource = new MongoDBSource(database);
dataSourceMap.put(sessionId, dataSource); dataSourceMap.put(sessionId, dataSource);
dataSource.open(); dataSource.open();
} }
@ -88,8 +77,8 @@ public class VitalSignTimerV2 {
MongoTemplate template = finalMongoDBSource.getConnection(); MongoTemplate template = finalMongoDBSource.getConnection();
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
// 生命体征信息 // 生命体征信息 这里要改改
List vitalSignsList = surgeryServiceV2.getVitalSignsList(template); List<Map> vitalSignsList = surgeryServiceV2.getVitalSignsList(template);
jsonObject.put("vitalSignsList", vitalSignsList); jsonObject.put("vitalSignsList", vitalSignsList);
// 标记信息 // 标记信息
@ -161,7 +150,7 @@ public class VitalSignTimerV2 {
// 定时任务设置1秒 // 定时任务设置1秒
Timer timer = new Timer(); Timer timer = new Timer();
timer.schedule(timerTask, 0, 2000); timer.schedule(timerTask, 0, 1000);
timerTaskMap.put(sessionId, timerTask); timerTaskMap.put(sessionId, timerTask);
} }