| package com.genersoft.iot.vmp.jt1078.session; | 
|   | 
| import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; | 
| import io.netty.channel.Channel; | 
| import org.slf4j.Logger; | 
| import org.slf4j.LoggerFactory; | 
|   | 
| import java.util.Map; | 
| import java.util.concurrent.ConcurrentHashMap; | 
| import java.util.concurrent.SynchronousQueue; | 
| import java.util.concurrent.TimeUnit; | 
|   | 
|   | 
| /** | 
|  * @author QingtaiJiang | 
|  * @date 2023/4/27 19:54 | 
|  * @email qingtaij@163.com | 
|  */ | 
| public enum SessionManager { | 
|     INSTANCE; | 
|     private final static Logger log = LoggerFactory.getLogger(SessionManager.class); | 
|   | 
|     // 用与消息的缓存 | 
|     private final Map<String, SynchronousQueue<String>> topicSubscribers = new ConcurrentHashMap<>(); | 
|   | 
|     // session的缓存 | 
|     private final Map<Object, Session> sessionMap; | 
|   | 
|     SessionManager() { | 
|         this.sessionMap = new ConcurrentHashMap<>(); | 
|     } | 
|   | 
|     /** | 
|      * 创建新的Session | 
|      * | 
|      * @param channel netty通道 | 
|      * @return 创建的session对象 | 
|      */ | 
|     public Session newSession(Channel channel) { | 
|         return new Session(channel); | 
|     } | 
|   | 
|   | 
|     /** | 
|      * 获取指定设备的Session | 
|      * | 
|      * @param clientId 设备Id | 
|      * @return Session | 
|      */ | 
|     public Session get(Object clientId) { | 
|         return sessionMap.get(clientId); | 
|     } | 
|   | 
|     /** | 
|      * 放入新设备连接的session | 
|      * | 
|      * @param clientId   设备ID | 
|      * @param newSession session | 
|      */ | 
|     protected void put(Object clientId, Session newSession) { | 
|         sessionMap.put(clientId, newSession); | 
|     } | 
|   | 
|   | 
|     /** | 
|      * 发送同步消息,接收响应 | 
|      * 默认超时时间6秒 | 
|      */ | 
|     public String request(Cmd cmd) { | 
|         // 默认6秒 | 
|         int timeOut = 6000; | 
|         return request(cmd, timeOut); | 
|     } | 
|   | 
|     public String request(Cmd cmd, Integer timeOut) { | 
|         Session session = this.get(cmd.getDevId()); | 
|         if (session == null) { | 
|             log.error("DevId: {} not online!", cmd.getDevId()); | 
|             return null; | 
|         } | 
|         String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo()); | 
|         SynchronousQueue<String> subscribe = subscribe(requestKey); | 
|         if (subscribe == null) { | 
|             log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey); | 
|             return null; | 
|         } | 
|         session.writeObject(cmd); | 
|         try { | 
|             return subscribe.poll(timeOut, TimeUnit.SECONDS); | 
|         } catch (InterruptedException e) { | 
|             log.warn("<<<<<<<<<< timeout" + session, e); | 
|         } finally { | 
|             this.unsubscribe(requestKey); | 
|         } | 
|         return null; | 
|     } | 
|   | 
|     public Boolean response(String devId, String respId, Long responseNo, String data) { | 
|         String requestKey = requestKey(devId, respId, responseNo); | 
|         SynchronousQueue<String> queue = topicSubscribers.get(requestKey); | 
|         if (queue != null) { | 
|             try { | 
|                 return queue.offer(data, 2, TimeUnit.SECONDS); | 
|             } catch (InterruptedException e) { | 
|                 log.error("{}", e.getMessage(), e); | 
|             } | 
|         } | 
|         log.warn("Not find response,key:{} data:{} ", requestKey, data); | 
|         return false; | 
|     } | 
|   | 
|     private void unsubscribe(String key) { | 
|         topicSubscribers.remove(key); | 
|     } | 
|   | 
|     private SynchronousQueue<String> subscribe(String key) { | 
|         SynchronousQueue<String> queue = null; | 
|         if (!topicSubscribers.containsKey(key)) | 
|             topicSubscribers.put(key, queue = new SynchronousQueue<String>()); | 
|         return queue; | 
|     } | 
|   | 
|     private String requestKey(String devId, String respId, Long requestNo) { | 
|         return String.join("_", devId.replaceFirst("^0*", ""), respId, requestNo.toString()); | 
|     } | 
|   | 
| } |