xiangpei
2024-09-30 b9ea75683423b46cb4d2aea2961cbb3c6c8837f7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.ycl.websocket.service;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ycl.websocket.msg.Message;
import com.ycl.websocket.NettyConnect;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
 
import java.util.Collection;
import java.util.List;
import java.util.Objects;
 
/**
 * @author:xp
 * @date:2024/4/21 9:39
 */
@Slf4j
@Service
public class DefaultSendWebsocketMsg implements SendWebsocketMsg {
 
    private final static ObjectMapper json = new ObjectMapper();
 
    @Override
    public void sendByUserId(Long userId, Message msg) {
        Channel channel = NettyConnect.getUserChannelMap().get(userId);
        if (Objects.nonNull(channel) && channel.isActive()) {
            try {
                channel.writeAndFlush(new TextWebSocketFrame(json.writeValueAsString(msg)));
            } catch (JsonProcessingException e) {
                log.error("消息发送失败,请检查消息格式");
            }
        }
    }
 
    @Override
    public void sendManyUser(List<Long> userIds, Message msg) {
        if (CollectionUtils.isEmpty(userIds)) {
            return;
        }
        for (Long userId : userIds) {
            this.sendByUserId(userId, msg);
        }
    }
 
    @Override
    public void broadcast(Message msg) {
        Collection<Channel> connects = NettyConnect.getUserChannelMap().values();
        try {
            String data = json.writeValueAsString(msg);
            connects.stream().forEach(connect -> {
                if (connect.isActive()) {
                    connect.writeAndFlush(new TextWebSocketFrame(data));
                }
            });
        } catch (JsonProcessingException e) {
            log.error("消息发送失败,请检查消息格式");
        }
    }
}