package com.rax.vital.timer; import cn.hutool.core.collection.ConcurrentHashSet; import com.alibaba.fastjson.JSONObject; import com.rax.common.security.util.SecurityUtils; import com.rax.vital.datasource.MongoDBSource; import com.rax.vital.datasource.MySQLSource; import com.rax.vital.medicine.service.*; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.sql.Connection; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * 生命体征和用药信息推送 * * @author zhaoyz * @date 2024/2/29 */ @RefreshScope @Component @RequiredArgsConstructor public class VitalSignTimer { private final SimpMessagingTemplate simpMessagingTemplate; private final VitalSignsService vitalSignsService; private final AIMedicineService aiMedicineService; private final DoctorMedicineService doctorMedicineService; private final FlagService flagService; private final RevulsionService revulsionService; // mongoDB定时任务容器 private static final Map timerMongoTaskMap = new ConcurrentHashMap<>() { }; // mongoDB链接工具类容器 private static final Map mongoDBSourceMap = new ConcurrentHashMap<>(); // mysql定时任务容器 private static final Map timerMysqlTaskMap = new ConcurrentHashMap<>(); // mysql链接容器 private static final Map mysqlConnectionMap = new ConcurrentHashMap(); // MongoDB的地址 @Value("${vital-sign.mongodb.host}") private String mongoDBHost; // MongoDB的用户名 @Value("${vital-sign.mongodb.username}") private String mongoUsername; // MongoDB的用户的密码 @Value("${vital-sign.mongodb.password}") private String mongoPassword; // mysql地址 @Value("${vital-sign.mysql.host}") private String mysqlHost; // mysql用户名 @Value("${vital-sign.mysql.username}") private String mysqlUsername; // mysql用户密码 @Value("${vital-sign.mysql.password}") private String mysqlPassword; private static final Map masterControlMap = new ConcurrentHashMap<>(); private static final Map userSessionMap = new ConcurrentHashMap<>(); private static final Map userDatabaseSessionMap = new ConcurrentHashMap<>(); private static final Map machineSessionMap = new ConcurrentHashMap<>(); private static final Map machineDatabaseSessionMap = new ConcurrentHashMap<>(); /** * 根据当前用户和患者数据库进行查询生命体征和用药信息并推送,数据库类型是MongoDB * * @author zhaoyz */ public void createAndSendMessageMongo(String database, String username, String simpSessionId) { synchronized (username) { if (!masterControlMap.containsKey(database)) { masterControlMap.put(database, username); } } TimerTask task = timerMongoTaskMap.get(simpSessionId); if (task != null) { return; } MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId); if (mongoDBSource == null) { mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); mongoDBSourceMap.put(simpSessionId, mongoDBSource); mongoDBSource.open(); } MongoDBSource finalMongoDBSource = mongoDBSource; TimerTask timerTask = new TimerTask() { @Override public void run() { MongoTemplate template = finalMongoDBSource.getTemplate(); HashMap result = new HashMap(); List vitalSignsList = vitalSignsService.getVitalSignsList(template); result.put("vitalSignsList", vitalSignsList); List aiMedicineList = aiMedicineService.getAIMedicine(template); result.put("aiMedicineList", aiMedicineList); List docMedicineList = doctorMedicineService.getDocMedicine(template); result.put("docMedicineList", docMedicineList); List revulsionList = revulsionService.getRevulsionServiceList(template); result.put("revulsionList", revulsionList); Map flags = flagService.getFlags(template); result.put("flags", flags); simpMessagingTemplate.convertAndSendToUser(username + ":" + database, "/surgeryData", result); } }; // 定时任务,设置1秒 Timer timer = new Timer(); timer.schedule(timerTask, 0, 1000); timerMongoTaskMap.put(simpSessionId, timerTask); } /** * 根据当前用户和患者数据库进行查询生命体征和用药信息并推送,数据库类型是MySQL * * @param database */ public void createAndSendMessageMySQL(String database) { String account = SecurityUtils.getUser().getUsername(); TimerTask task = timerMysqlTaskMap.get(account + "-" + database); if (task != null) { return; } MySQLSource mySQLSource = mysqlConnectionMap.get(database); if (mySQLSource == null) { mySQLSource = new MySQLSource(mysqlHost, mysqlPassword, mysqlUsername, database); mysqlConnectionMap.put(database, mySQLSource); mySQLSource.open(); mySQLSource.increaseCount(); } MySQLSource finalMySQLSource = mySQLSource; TimerTask timerTask = new TimerTask() { @Override public void run() { HashMap result = new HashMap(); Connection connection = finalMySQLSource.getConnection(); List vitalSignsList = vitalSignsService.getVitalSignsList(connection); result.put("vitalSignsList", vitalSignsList); List aiMedicineList = aiMedicineService.getAIMedicine(connection); result.put("aiMedicineList", aiMedicineList); List docMedicineList = doctorMedicineService.getDocMedicine(connection); result.put("docMedicineList", docMedicineList); Map flag = flagService.getFlag(connection); result.put("flag", flag); simpMessagingTemplate.convertAndSendToUser(account, "/doctorMedicine", result); } }; // 定时任务,设置1秒 Timer timer = new Timer(); timer.schedule(timerTask, 0, 1000); timerMysqlTaskMap.put(account + "-" + database, timerTask); } /** * 停止指定的某个用户查询的患者数据库定时器,数据库类型是MongoDB * * @author zhaoyz */ public synchronized void stopTimerTaskMongo(String simpSessionId) { TimerTask timerTask = timerMongoTaskMap.get(simpSessionId); if (timerTask != null) { timerTask.cancel(); MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId); mongoDBSource.close(); timerMongoTaskMap.remove(simpSessionId); mongoDBSourceMap.remove(simpSessionId); } } /** * 停止指定的某个用户查询的患者数据库定时器,数据库类型是MySQL * * @param database * @param user */ public synchronized void stopTimerTaskMySQL(String database, String user) { TimerTask timerTask = timerMysqlTaskMap.get(user + "-" + database); if (timerTask != null) { timerTask.cancel(); timerMysqlTaskMap.remove(user + "-" + database); MySQLSource mySQLSource = mysqlConnectionMap.get(database); mySQLSource.decreaseCount(); int count = mySQLSource.getCount(); if (count == 0) { mySQLSource.close(); mysqlConnectionMap.remove(database); } } } public void changeAIFlag(String database, String username, String simpSessionId, String flag, String medicine, String value) { synchronized (username) { if (masterControlMap.containsKey(database) && masterControlMap.get(database).equals(username)) { MongoDBSource mongoDBSource = mongoDBSourceMap.get(simpSessionId); if (mongoDBSource == null) { mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); mongoDBSourceMap.put(simpSessionId, mongoDBSource); mongoDBSource.open(); } MongoTemplate template = mongoDBSource.getTemplate(); aiMedicineService.changeAIFlagMedicine(template, flag, medicine, value); HashMap result = new HashMap(); result.put("status", 0); result.put("flag", flag); result.put("msg", ""); simpMessagingTemplate.convertAndSendToUser(username + ":" + database, "/medicineData", result); } else { HashMap result = new HashMap(); result.put("status", 1); result.put("msg", "不是主控人员"); simpMessagingTemplate.convertAndSendToUser(username + ":" + database, "/medicineData", result); } } } public void createAndSendWSMessageMongo(String database, String username, WebSocketSession session) { synchronized (username) { if (!masterControlMap.containsKey(database)) { masterControlMap.put(database, username); } } String sessionId = session.getId(); TimerTask task = timerMongoTaskMap.get(sessionId); if (task != null) { return; } MongoDBSource mongoDBSource = mongoDBSourceMap.get(sessionId); if (mongoDBSource == null) { mongoDBSource = new MongoDBSource(mongoDBHost, mongoPassword, mongoUsername, database); mongoDBSourceMap.put(sessionId, mongoDBSource); mongoDBSource.open(); } MongoDBSource finalMongoDBSource = mongoDBSource; TimerTask timerTask = new TimerTask() { @Override public void run() { MongoTemplate template = finalMongoDBSource.getTemplate(); JSONObject jsonObject = new JSONObject(); List vitalSignsList = vitalSignsService.getVitalSignsList(template); jsonObject.put("vitalSignsList", vitalSignsList); List aiMedicineList = aiMedicineService.getAIMedicine(template); jsonObject.put("aiMedicineList", aiMedicineList); List docMedicineList = doctorMedicineService.getDocMedicine(template); jsonObject.put("docMedicineList", docMedicineList); List revulsionList = revulsionService.getRevulsionServiceList(template); jsonObject.put("revulsionList", revulsionList); Map flags = flagService.getFlags(template); jsonObject.put("flags", flags); WebSocketMessage message = new TextMessage(jsonObject.toJSONString().getBytes()); try { session.sendMessage(message); } catch (IOException e) { throw new RuntimeException(e); } } }; // 定时任务,设置1秒 Timer timer = new Timer(); timer.schedule(timerTask, 0, 1000); timerMongoTaskMap.put(sessionId, timerTask); } public synchronized void setWSAIFlagSession(WebSocketSession session) { userSessionMap.put(session.getId(), session); } public void removeWSAIFlagSession(WebSocketSession session) { userSessionMap.remove(session.getId()); if (userDatabaseSessionMap.containsValue(session.getId())) { for (String database : userDatabaseSessionMap.values()) { if (userDatabaseSessionMap.get(database).equals(session.getId())) { userDatabaseSessionMap.remove(database); } } } } public void changeWSAIFlag(String database, String username, WebSocketSession session, String flag, String medicine, String value) throws IOException { synchronized (username) { JSONObject result = new JSONObject(); if (masterControlMap.containsKey(database) && masterControlMap.get(database).equals(username)) { if (!userDatabaseSessionMap.containsKey(database)) { userDatabaseSessionMap.put(database, session.getId()); } if (machineDatabaseSessionMap.containsKey(database)) { String sessionId = machineDatabaseSessionMap.get(database); WebSocketSession machineSession = machineSessionMap.get(sessionId); result.put(medicine, value); result.put("flag", flag); machineSession.sendMessage(new TextMessage(result.toJSONString().getBytes())); } else { result.put("flag", flag); result.put("medicine", medicine); result.put("msg", "设备端未连接"); result.put("status", 1); session.sendMessage(new TextMessage(result.toJSONString().getBytes())); } } else { result.put("flag", flag); result.put("medicine", medicine); result.put("status", 1); result.put("msg", "不是主控人员"); session.sendMessage(new TextMessage(result.toJSONString().getBytes())); } } } public synchronized void setMachineSessionMap(WebSocketSession session) throws IOException { machineSessionMap.put(session.getId(), session); } public void removeMachineSessionMap(WebSocketSession session) throws IOException { machineSessionMap.remove(session.getId()); if (machineDatabaseSessionMap.containsValue(session.getId())) { for (String database : machineDatabaseSessionMap.values()) { if (machineDatabaseSessionMap.get(database).equals(session.getId())) { machineDatabaseSessionMap.remove(database); } } } } public synchronized void sendMachineFlag(String database, WebSocketSession session) throws IOException { if (machineDatabaseSessionMap.containsKey(database)) { if (machineDatabaseSessionMap.get(database).equals(session.getId())) { sendUserMessage(database, session); } else { JSONObject msg = new JSONObject(); msg.put("status", 1); msg.put("msg", "手术已有设备远程中"); session.sendMessage(new TextMessage(msg.toJSONString().getBytes())); } } else { machineDatabaseSessionMap.put(database, session.getId()); sendUserMessage(database, session); } } private synchronized void sendUserMessage(String database, WebSocketSession session) throws IOException { JSONObject result = new JSONObject(); if (userDatabaseSessionMap.containsKey(database)) { String userSessionId = userDatabaseSessionMap.get(database); WebSocketSession webSocketSession = userSessionMap.get(userSessionId); result.put("status", 0); result.put("msg", ""); webSocketSession.sendMessage(new TextMessage(result.toJSONString().getBytes())); } else { result.put("status", 1); result.put("msg", "网站未进行远程"); session.sendMessage(new TextMessage(result.toJSONString().getBytes())); } } }