fix: 聊天室无法推送bug, fix

This commit is contained in:
republicline 2024-10-15 18:05:48 +08:00
parent c12d0b24cf
commit 2c90f84175
3 changed files with 47 additions and 16 deletions

View File

@ -10,6 +10,8 @@ import org.springframework.security.oauth2.server.authorization.OAuth2TokenType;
import org.springframework.web.socket.*; import org.springframework.web.socket.*;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -25,6 +27,12 @@ public class ChatHandler implements WebSocketHandler {
private Map<String, ScheduledExecutorService> timerTaskMap = new ConcurrentHashMap(); private Map<String, ScheduledExecutorService> timerTaskMap = new ConcurrentHashMap();
// dbName -> sessionList
private Map<String, List<WebSocketSession>> dbNameSessionList = new ConcurrentHashMap();
// sessionId -> dbName
private Map<String, String> sessionDbMap = new ConcurrentHashMap();
@Override @Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception { public void afterConnectionEstablished(WebSocketSession session) throws Exception {
startHeartbeat(session); startHeartbeat(session);
@ -34,26 +42,39 @@ public class ChatHandler implements WebSocketHandler {
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
String query = session.getUri().getQuery(); String query = session.getUri().getQuery();
String decode = URLDecoder.decode(query); String decode = URLDecoder.decode(query);
// System.out.println("query = " + query);
Map params = GetHttpParamUtil.getParams(decode); Map params = GetHttpParamUtil.getParams(decode);
String token = (String) params.get("token"); String token = (String) params.get("token");
// String token = "azIc_An5dRViceuMN5B5_S-k-1YH8gTCkzc9A6d8mJYguTOEOXWHLaojcpqNNrhYG_9QSUV_y9LpYCN01SqY-GHcXJoMEqCxU-3Qudvkizll3T6mOWK-MJDODMZHETGj";
OAuth2Authorization authorization = authorizationService.findByToken(token, OAuth2TokenType.ACCESS_TOKEN); OAuth2Authorization authorization = authorizationService.findByToken(token, OAuth2TokenType.ACCESS_TOKEN);
if (authorization != null) { if (authorization != null) {
String username = authorization.getPrincipalName(); String username = authorization.getPrincipalName();
// System.out.println("username = " + username);
String payload = (String) message.getPayload(); String payload = (String) message.getPayload();
JSONObject jsonObject = JSONObject.parseObject(payload); JSONObject jsonObject = JSONObject.parseObject(payload);
if ("heartbeat".equals(jsonObject.getString("msgType"))) { if ("heartbeat".equals(jsonObject.getString("msgType"))) {
// session.sendMessage(new TextMessage("")); // session.sendMessage(new TextMessage(""));
}else if ("init".equals(jsonObject.getString("msgType"))) {
// 初始化, 将session信息保存起来
String dbName = jsonObject.getString("idNum");
if (!dbNameSessionList.containsKey(dbName)) {
ArrayList<WebSocketSession> sessionArrayList = new ArrayList<>();
dbNameSessionList.put(dbName, sessionArrayList);
sessionDbMap.put(session.getId(), dbName);
}
dbNameSessionList.get(dbName).add(session);
} else { } else {
System.out.println("收到消息-start = " + jsonObject.toJSONString());
String patientName = jsonObject.getString("patientName"); String patientName = jsonObject.getString("patientName");
String idNum = jsonObject.getString("idNum"); String idNum = jsonObject.getString("idNum");
String date = jsonObject.getString("date"); String date = jsonObject.getString("date");
// 消息内容 // 消息内容
String msg = jsonObject.getString("msg"); String msg = jsonObject.getString("msg");
chatService.sendMessageMysql(username, patientName, idNum, date, session, msg); //
List<WebSocketSession> webSocketSessions = dbNameSessionList.get(idNum);
for (WebSocketSession webSocketSession : webSocketSessions) {
System.out.println("webSocketSession.size() = " + webSocketSessions.size());
System.out.println("webSocketSession = " + webSocketSession);
}
chatService.sendMessageMysql(username, patientName, idNum, date, session, msg, webSocketSessions);
} }
} }
} }
@ -61,6 +82,7 @@ public class ChatHandler implements WebSocketHandler {
@Override @Override
public void handleTransportError(WebSocketSession session, Throwable exception) { public void handleTransportError(WebSocketSession session, Throwable exception) {
System.out.println("Error: " + exception.getMessage()); System.out.println("Error: " + exception.getMessage());
stopMap(session);
} }
@Override @Override
@ -68,6 +90,7 @@ public class ChatHandler implements WebSocketHandler {
System.out.println("CloseStatus: " + closeStatus.getReason() + closeStatus.getCode()); System.out.println("CloseStatus: " + closeStatus.getReason() + closeStatus.getCode());
stopHeartbeat(session); stopHeartbeat(session);
chatService.stopTask(session.getId()); chatService.stopTask(session.getId());
stopMap(session);
} }
@Override @Override
@ -103,4 +126,10 @@ public class ChatHandler implements WebSocketHandler {
heartbeatExecutor.shutdownNow(); heartbeatExecutor.shutdownNow();
} }
private void stopMap(WebSocketSession session) {
String dbName = sessionDbMap.get(session.getId());
dbNameSessionList.remove(dbName);
sessionDbMap.remove(session.getId());
}
} }

View File

@ -3,13 +3,14 @@ package com.rax.vital.v1.medicine.service;
import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.WebSocketSession;
import java.io.IOException; import java.io.IOException;
import java.util.List;
public interface ChatService { public interface ChatService {
void sendMessage(String username, String patientName, String idNum, String date, String simpSessionId, String msg); void sendMessage(String username, String patientName, String idNum, String date, String simpSessionId, String msg);
void sendMessage(String username, String patientName, String idNum, String date, WebSocketSession session, String msg) throws IOException; void sendMessage(String username, String patientName, String idNum, String date, WebSocketSession session, String msg) throws IOException;
void sendMessageMysql(String username, String patientName, String idNum, String date, WebSocketSession session, String msg) throws Exception; void sendMessageMysql(String username, String patientName, String idNum, String date, WebSocketSession session, String msg, List<WebSocketSession> webSocketSessions) throws Exception;
void stopTimerTask(String simpSessionId); void stopTimerTask(String simpSessionId);

View File

@ -157,9 +157,8 @@ public class ChatServiceImpl implements ChatService {
} }
@Override @Override
public void sendMessageMysql(String username, String patientName, String idNum, String date, WebSocketSession session, String msg) throws SQLException { public void sendMessageMysql(String username, String patientName, String idNum, String date, WebSocketSession session, String msg, List<WebSocketSession> webSocketSessionList) throws SQLException, IOException {
CustomDataSource dataSource = datasourceMap.get(session.getId()); CustomDataSource dataSource = datasourceMap.get(session.getId());
// String databaseName = patientName + idNum;
String databaseName = idNum; String databaseName = idNum;
ArrayList<Map> history = new ArrayList<>(); ArrayList<Map> history = new ArrayList<>();
@ -230,7 +229,6 @@ public class ChatServiceImpl implements ChatService {
DatabaseMetaData metaData = connection.getMetaData(); DatabaseMetaData metaData = connection.getMetaData();
String tableName = "t_chat"; String tableName = "t_chat";
ResultSet tablesx = metaData.getTables(null, null, tableName, new String[]{"TABLE"}); ResultSet tablesx = metaData.getTables(null, null, tableName, new String[]{"TABLE"});
System.out.println("tablesx = " + tablesx.next());
if (!tablesx.next()) { if (!tablesx.next()) {
try { try {
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
@ -272,14 +270,17 @@ public class ChatServiceImpl implements ChatService {
e.printStackTrace(); e.printStackTrace();
} }
Map<String, WebSocketSession> sessionMap1 = databaseSessionMap.get(databaseName); // Map<String, WebSocketSession> sessionMap1 = databaseSessionMap.get(databaseName);
for (Map.Entry<String, WebSocketSession> entry : sessionMap1.entrySet()) { // for (Map.Entry<String, WebSocketSession> entry : sessionMap1.entrySet()) {
WebSocketSession value = entry.getValue(); // WebSocketSession value = entry.getValue();
try { // try {
value.sendMessage(new TextMessage(param.toJSONString().getBytes())); // value.sendMessage(new TextMessage(param.toJSONString().getBytes()));
} catch (IOException e) { // } catch (IOException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
// }
for (WebSocketSession webSocketSession : webSocketSessionList) {
webSocketSession.sendMessage(new TextMessage(param.toJSONString().getBytes()));
} }
} }
} }