package com.monkeylessey.websocket.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.monkeylessey.websocket.Message; import com.monkeylessey.websocket.NettyConnect; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.Collection; import java.util.List; import java.util.Objects; /** * @author:xp * @date:2024/4/21 9:39 */ @Slf4j @Service public class DefaultSendWebsocketMsg implements SendWebsocketMsg { private final static ObjectMapper json = new ObjectMapper(); @Override public void sendByUserId(Integer userId, Message msg) { Channel channel = NettyConnect.getUserChannelMap().get(userId); if (Objects.nonNull(channel) && channel.isActive()) { try { channel.writeAndFlush(new TextWebSocketFrame(json.writeValueAsString(msg))); } catch (JsonProcessingException e) { log.error("消息发送失败,请检查消息格式"); } } } @Override public void sendManyUser(List userIds, Message msg) { if (CollectionUtils.isEmpty(userIds)) { return; } for (Integer userId : userIds) { this.sendByUserId(userId, msg); } } @Override public void broadcast(Message msg) { Collection connects = NettyConnect.getUserChannelMap().values(); try { String data = json.writeValueAsString(msg); connects.stream().forEach(connect -> { if (connect.isActive()) { connect.writeAndFlush(new TextWebSocketFrame(data)); } }); } catch (JsonProcessingException e) { log.error("消息发送失败,请检查消息格式"); } } }