648540858
2023-10-08 205f1f1f6025bc812ccb7cf2408336d956c65d1a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.genersoft.iot.vmp.jt1078.session;
 
import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
 
 
/**
 * @author QingtaiJiang
 * @date 2023/4/27 19:54
 * @email qingtaij@163.com
 */
public enum SessionManager {
    INSTANCE;
    private final static Logger log = LoggerFactory.getLogger(SessionManager.class);
 
    // 用与消息的缓存
    private final Map<String, SynchronousQueue<String>> topicSubscribers = new ConcurrentHashMap<>();
 
    // session的缓存
    private final Map<Object, Session> sessionMap;
 
    SessionManager() {
        this.sessionMap = new ConcurrentHashMap<>();
    }
 
    /**
     * 创建新的Session
     *
     * @param channel netty通道
     * @return 创建的session对象
     */
    public Session newSession(Channel channel) {
        return new Session(channel);
    }
 
 
    /**
     * 获取指定设备的Session
     *
     * @param clientId 设备Id
     * @return Session
     */
    public Session get(Object clientId) {
        return sessionMap.get(clientId);
    }
 
    /**
     * 放入新设备连接的session
     *
     * @param clientId   设备ID
     * @param newSession session
     */
    protected void put(Object clientId, Session newSession) {
        sessionMap.put(clientId, newSession);
    }
 
 
    /**
     * 发送同步消息,接收响应
     * 默认超时时间6秒
     */
    public String request(Cmd cmd) {
        // 默认6秒
        int timeOut = 6000;
        return request(cmd, timeOut);
    }
 
    public String request(Cmd cmd, Integer timeOut) {
        Session session = this.get(cmd.getDevId());
        if (session == null) {
            log.error("DevId: {} not online!", cmd.getDevId());
            return null;
        }
        String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo());
        SynchronousQueue<String> subscribe = subscribe(requestKey);
        if (subscribe == null) {
            log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey);
            return null;
        }
        session.writeObject(cmd);
        try {
            return subscribe.poll(timeOut, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("<<<<<<<<<< timeout" + session, e);
        } finally {
            this.unsubscribe(requestKey);
        }
        return null;
    }
 
    public Boolean response(String devId, String respId, Long responseNo, String data) {
        String requestKey = requestKey(devId, respId, responseNo);
        SynchronousQueue<String> queue = topicSubscribers.get(requestKey);
        if (queue != null) {
            try {
                return queue.offer(data, 2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.error("{}", e.getMessage(), e);
            }
        }
        log.warn("Not find response,key:{} data:{} ", requestKey, data);
        return false;
    }
 
    private void unsubscribe(String key) {
        topicSubscribers.remove(key);
    }
 
    private SynchronousQueue<String> subscribe(String key) {
        SynchronousQueue<String> queue = null;
        if (!topicSubscribers.containsKey(key))
            topicSubscribers.put(key, queue = new SynchronousQueue<String>());
        return queue;
    }
 
    private String requestKey(String devId, String respId, Long requestNo) {
        return String.join("_", devId.replaceFirst("^0*", ""), respId, requestNo.toString());
    }
 
}