package cn.lili.controller.im; import cn.hutool.json.JSONUtil; import cn.lili.cache.Cache; import cn.lili.common.security.AuthUser; import cn.lili.common.security.context.UserContext; import cn.lili.common.security.enums.UserEnums; import cn.lili.modules.im.config.CustomSpringConfigurator; import cn.lili.modules.im.entity.dos.ImMessage; import cn.lili.modules.im.entity.dos.ImTalk; import cn.lili.modules.im.entity.enums.MessageResultType; import cn.lili.modules.im.entity.vo.MessageOperation; import cn.lili.modules.im.entity.vo.MessageVO; import cn.lili.modules.im.service.ImMessageService; import cn.lili.modules.im.service.ImTalkService; import com.alibaba.druid.util.StringUtils; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; /** * @author liushuai */ @Component @ServerEndpoint(value = "/lili/webSocket/{accessToken}", configurator = CustomSpringConfigurator.class) @Scope("prototype") @Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class WebSocketServer { /** * 在线人数 PS 注意,只能单节点,如果多节点部署需要自行寻找方案 */ private static ConcurrentHashMap sessionPools = new ConcurrentHashMap<>(); /** * 消息服务 */ private final ImMessageService imMessageService; private final ImTalkService imTalkService; private final Cache cache; /** * 建立连接 * * @param session */ @OnOpen public void onOpen(@PathParam("accessToken") String accessToken, Session session) { AuthUser authUser = UserContext.getAuthUser(cache, accessToken); String sessionId = UserEnums.STORE.equals(authUser.getRole()) ? authUser.getStoreId() : authUser.getId(); //如果已有会话,则进行下线提醒。 if (sessionPools.containsKey(sessionId)) { log.info("用户重复登陆,旧用户下线"); Session oldSession = sessionPools.get(sessionId); sendMessage(oldSession, MessageVO.builder().messageResultType(MessageResultType.OFFLINE).result("用户异地登陆").build()); try { oldSession.close(); } catch (Exception e) { e.printStackTrace(); } } sessionPools.put(sessionId, session); } /** * 关闭连接 */ @OnClose public void onClose(@PathParam("accessToken") String accessToken) { AuthUser authUser = UserContext.getAuthUser(accessToken); log.info("用户断开断开连接:{}", JSONUtil.toJsonStr(authUser)); sessionPools.remove(authUser); } /** * 发送消息 * * @param msg * @throws IOException */ @OnMessage public void onMessage(@PathParam("accessToken") String accessToken, String msg) { log.info("发送消息:{}", msg); MessageOperation messageOperation = JSON.parseObject(msg, MessageOperation.class); operation(accessToken, messageOperation); } /** * IM操作 * * @param accessToken * @param messageOperation */ private void operation(String accessToken, MessageOperation messageOperation) { AuthUser authUser = UserContext.getAuthUser(accessToken); switch (messageOperation.getOperationType()) { case PING: break; case MESSAGE: //保存消息 ImMessage imMessage = new ImMessage(messageOperation); imMessageService.save(imMessage); //修改最后消息信息 imTalkService.update(new LambdaUpdateWrapper().eq(ImTalk::getId, messageOperation.getTalkId()) .set(ImTalk::getLastTalkMessage, messageOperation.getContext()) .set(ImTalk::getLastTalkTime, imMessage.getCreateTime()) .set(ImTalk::getLastMessageType, imMessage.getMessageType())); //发送消息 sendMessage(messageOperation.getTo(), new MessageVO(MessageResultType.MESSAGE, imMessage)); break; case READ: if (!StringUtils.isEmpty(messageOperation.getContext())) { imMessageService.read(messageOperation.getTalkId(), accessToken); } break; case UNREAD: sendMessage(authUser.getId(), new MessageVO(MessageResultType.UN_READ, imMessageService.unReadMessages(accessToken))); break; case HISTORY: sendMessage(authUser.getId(), new MessageVO(MessageResultType.HISTORY, imMessageService.historyMessage(accessToken, messageOperation.getTo()))); break; default: break; } } /** * 发送消息 * * @param sessionId sessionId * @param message 消息对象 */ private void sendMessage(String sessionId, MessageVO message) { Session session = sessionPools.get(sessionId); sendMessage(session, message); } /** * 发送消息 * * @param session 会话 * @param message 消息对象 */ private void sendMessage(Session session, MessageVO message) { if (session != null) { try { session.getBasicRemote().sendText(JSON.toJSONString(message, true)); } catch (Exception e) { e.printStackTrace(); } } } /** * socket exception * * @param session * @param throwable */ @OnError public void onError(Session session, Throwable throwable) { log.error("socket异常: {}", session.getId(), throwable); } }