package com.ycl.jxkg.server; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ycl.jxkg.domain.query.WebSocketQuery; import com.ycl.jxkg.domain.vo.WebsocketDataVO; import com.ycl.jxkg.enums.WebsocketCommendEnum; import com.ycl.jxkg.service.EducationResourceService; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CopyOnWriteArraySet; /** * @author:xp * @date:2024/6/26 15:51 */ @Component @Slf4j @ServerEndpoint("/websocket/{userId}") public class WebsocketServer { private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext context) { applicationContext = context; } /** * 线程安全的无序的集合 */ private static final CopyOnWriteArraySet SESSIONS = new CopyOnWriteArraySet<>(); /** * 存储在线连接数 */ private static final Map SESSION_POOL = new HashMap<>(); @OnOpen public void onOpen(Session session, @PathParam(value = "userId") Integer userId) { try { SESSIONS.add(session); SESSION_POOL.put(userId, session); log.info("【WebSocket消息】有新的连接,总数为:" + SESSIONS.size()); } catch (Exception e) { e.printStackTrace(); } } @OnClose public void onClose(Session session) { try { SESSIONS.remove(session); log.info("【WebSocket消息】连接断开,总数为:" + SESSIONS.size()); } catch (Exception e) { e.printStackTrace(); } } @OnMessage public void onMessage(String message) { WebSocketQuery webSocketQuery = JSONObject.parseObject(message, WebSocketQuery.class); String command = webSocketQuery.getCommand(); Integer userId = webSocketQuery.getId(); if(WebsocketCommendEnum.RECORD_STUDY_TIME.getCommand().equals(command)){ log.info("存消息"); EducationResourceService educationResourceService = applicationContext.getBean(EducationResourceService.class); educationResourceService.recordTime(userId); } // 会议操作 if (WebsocketCommendEnum.MUTE.getCommand().equals(command) || WebsocketCommendEnum.OPEN_CAMERA.getCommand().equals(command) || WebsocketCommendEnum.KICK_OUT.getCommand().equals(command)) { WebsocketDataVO websocketDataVO = new WebsocketDataVO(); websocketDataVO.setCommend(command); sendOneMessage(userId, JSON.toJSONString(websocketDataVO)); } log.info("【WebSocket消息】收到客户端消息:" + message); } /** * 此为广播消息 * * @param message 消息 */ public void sendAllMessage(String message) { log.info("【WebSocket消息】广播消息:" + message); for (Session session : SESSIONS) { try { if (session.isOpen()) { session.getAsyncRemote().sendText(message); } } catch (Exception e) { e.printStackTrace(); } } } /** * 此为单点消息 * * @param userId 用户编号 * @param message 消息 */ public void sendOneMessage(Integer userId, String message) { Session session = SESSION_POOL.get(userId); if (session != null && session.isOpen()) { try { synchronized (session) { log.info("【WebSocket消息】单点消息:" + message); session.getAsyncRemote().sendText(message); } } catch (Exception e) { e.printStackTrace(); } } } /** * 此为单点消息(多人) * * @param userIds 用户编号列表 * @param message 消息 */ public void sendMoreMessage(List userIds, String message) { for (Integer userId : userIds) { Session session = SESSION_POOL.get(userId); if (session != null && session.isOpen()) { try { log.info("【WebSocket消息】单点消息:" + message); session.getAsyncRemote().sendText(message); } catch (Exception e) { e.printStackTrace(); } } } } /** * 校验用户是否在线 * * @param userId * @return */ public Boolean checkUserOnline(Integer userId) { Session session = SESSION_POOL.get(userId); return Objects.nonNull(session); } }