|  |  |  | 
|---|
|  |  |  | package com.genersoft.iot.vmp.service.redisMsg; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson.JSON; | 
|---|
|  |  |  | import com.alibaba.fastjson.JSONObject; | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSON; | 
|---|
|  |  |  | import com.alibaba.fastjson2.JSONObject; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.conf.DynamicTask; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.conf.UserSetting; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; | 
|---|
|  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private ZlmHttpHookSubscribe subscribe; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private boolean taskQueueHandlerRun = false; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Qualifier("taskExecutor") | 
|---|
|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void onMessage(Message message, byte[] bytes) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | boolean isEmpty = taskQueue.isEmpty(); | 
|---|
|  |  |  | taskQueue.offer(message); | 
|---|
|  |  |  | if (!taskQueueHandlerRun) { | 
|---|
|  |  |  | taskQueueHandlerRun = true; | 
|---|
|  |  |  | if (isEmpty) { | 
|---|
|  |  |  | taskExecutor.execute(() -> { | 
|---|
|  |  |  | while (!taskQueue.isEmpty()) { | 
|---|
|  |  |  | Message msg = taskQueue.poll(); | 
|---|
|  |  |  | JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
|---|
|  |  |  | WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); | 
|---|
|  |  |  | if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
|---|
|  |  |  | logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | switch (wvpRedisMsg.getCmd()){ | 
|---|
|  |  |  | case WvpRedisMsgCmd.GET_SEND_ITEM: | 
|---|
|  |  |  | RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); | 
|---|
|  |  |  | requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
|---|
|  |  |  | RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; | 
|---|
|  |  |  | requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); | 
|---|
|  |  |  | WvpRedisMsg wvpRedisMsg = JSON.to(WvpRedisMsg.class, msgJSON); | 
|---|
|  |  |  | if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if (WvpRedisMsg.isRequest(wvpRedisMsg)) { | 
|---|
|  |  |  | logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
|---|
|  |  |  | switch (wvpRedisMsg.getCmd()){ | 
|---|
|  |  |  | case WvpRedisMsgCmd.GET_SEND_ITEM: | 
|---|
|  |  |  | switch (wvpRedisMsg.getCmd()){ | 
|---|
|  |  |  | case WvpRedisMsgCmd.GET_SEND_ITEM: | 
|---|
|  |  |  | RequestSendItemMsg content = JSON.to(RequestSendItemMsg.class, wvpRedisMsg.getContent()); | 
|---|
|  |  |  | requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
|---|
|  |  |  | RequestPushStreamMsg param = JSON.to(RequestPushStreamMsg.class, wvpRedisMsg.getContent()); | 
|---|
|  |  |  | requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | WVPResult content  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); | 
|---|
|  |  |  | switch (wvpRedisMsg.getCmd()){ | 
|---|
|  |  |  | case WvpRedisMsgCmd.GET_SEND_ITEM: | 
|---|
|  |  |  |  | 
|---|
|  |  |  | String key = wvpRedisMsg.getSerial(); | 
|---|
|  |  |  | switch (content.getCode()) { | 
|---|
|  |  |  | case 0: | 
|---|
|  |  |  | ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); | 
|---|
|  |  |  | PlayMsgCallback playMsgCallback = callbacks.get(key); | 
|---|
|  |  |  | if (playMsgCallback != null) { | 
|---|
|  |  |  | callbacksForError.remove(key); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | playMsgCallback.handler(responseSendItemMsg); | 
|---|
|  |  |  | } catch (ParseException e) { | 
|---|
|  |  |  | logger.error("[REDIS消息处理异常] ", e); | 
|---|
|  |  |  | WVPResult content  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | String key = wvpRedisMsg.getSerial(); | 
|---|
|  |  |  | switch (content.getCode()) { | 
|---|
|  |  |  | case 0: | 
|---|
|  |  |  | ResponseSendItemMsg responseSendItemMsg =JSON.to(ResponseSendItemMsg.class, content.getData()); | 
|---|
|  |  |  | PlayMsgCallback playMsgCallback = callbacks.get(key); | 
|---|
|  |  |  | if (playMsgCallback != null) { | 
|---|
|  |  |  | callbacksForError.remove(key); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | playMsgCallback.handler(responseSendItemMsg); | 
|---|
|  |  |  | } catch (ParseException e) { | 
|---|
|  |  |  | logger.error("[REDIS消息处理异常] ", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
|---|
|  |  |  | case ERROR_CODE_OFFLINE: | 
|---|
|  |  |  | case ERROR_CODE_TIMEOUT: | 
|---|
|  |  |  | PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
|---|
|  |  |  | if (errorCallback != null) { | 
|---|
|  |  |  | callbacks.remove(key); | 
|---|
|  |  |  | errorCallback.handler(content); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
|---|
|  |  |  | WVPResult wvpResult  = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); | 
|---|
|  |  |  | String serial = wvpRedisMsg.getSerial(); | 
|---|
|  |  |  | switch (wvpResult.getCode()) { | 
|---|
|  |  |  | case 0: | 
|---|
|  |  |  | JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
|---|
|  |  |  | PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
|---|
|  |  |  | if (playMsgCallback != null) { | 
|---|
|  |  |  | callbacksForError.remove(serial); | 
|---|
|  |  |  | playMsgCallback.handler(jsonObject); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
|---|
|  |  |  | case ERROR_CODE_OFFLINE: | 
|---|
|  |  |  | case ERROR_CODE_TIMEOUT: | 
|---|
|  |  |  | PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
|---|
|  |  |  | if (errorCallback != null) { | 
|---|
|  |  |  | callbacks.remove(serial); | 
|---|
|  |  |  | errorCallback.handler(wvpResult); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
|---|
|  |  |  | case ERROR_CODE_OFFLINE: | 
|---|
|  |  |  | case ERROR_CODE_TIMEOUT: | 
|---|
|  |  |  | PlayMsgErrorCallback errorCallback = callbacksForError.get(key); | 
|---|
|  |  |  | if (errorCallback != null) { | 
|---|
|  |  |  | callbacks.remove(key); | 
|---|
|  |  |  | errorCallback.handler(content); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: | 
|---|
|  |  |  | WVPResult wvpResult  = JSON.to(WVPResult.class, wvpRedisMsg.getContent()); | 
|---|
|  |  |  | String serial = wvpRedisMsg.getSerial(); | 
|---|
|  |  |  | switch (wvpResult.getCode()) { | 
|---|
|  |  |  | case 0: | 
|---|
|  |  |  | JSONObject jsonObject = (JSONObject)wvpResult.getData(); | 
|---|
|  |  |  | PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); | 
|---|
|  |  |  | if (playMsgCallback != null) { | 
|---|
|  |  |  | callbacksForError.remove(serial); | 
|---|
|  |  |  | playMsgCallback.handler(jsonObject); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: | 
|---|
|  |  |  | case ERROR_CODE_OFFLINE: | 
|---|
|  |  |  | case ERROR_CODE_TIMEOUT: | 
|---|
|  |  |  | PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); | 
|---|
|  |  |  | if (errorCallback != null) { | 
|---|
|  |  |  | callbacks.remove(serial); | 
|---|
|  |  |  | errorCallback.handler(wvpResult); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | default: | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }catch (Exception e) { | 
|---|
|  |  |  | logger.warn("[RedisGbPlayMsg] 发现未处理的异常, {}",e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | taskQueueHandlerRun = false; | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | 
|---|
|  |  |  | SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), | 
|---|
|  |  |  | content.getPort(), content.getSsrc(), content.getPlatformId(), | 
|---|
|  |  |  | content.getApp(), content.getStream(), content.getChannelId(), | 
|---|
|  |  |  | content.getTcp()); | 
|---|
|  |  |  | content.getTcp(), content.getRtcp()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | WVPResult<ResponseSendItemMsg> result = new WVPResult<>(); | 
|---|
|  |  |  | result.setCode(0); | 
|---|
|  |  |  | 
|---|
|  |  |  | * @param callback 得到信息的回调 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, | 
|---|
|  |  |  | String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { | 
|---|
|  |  |  | String platformId, String channelId, boolean isTcp, boolean rtcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { | 
|---|
|  |  |  | RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( | 
|---|
|  |  |  | serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName); | 
|---|
|  |  |  | serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, rtcp, platformName); | 
|---|
|  |  |  | requestSendItemMsg.setServerId(serverId); | 
|---|
|  |  |  | String key = UUID.randomUUID().toString(); | 
|---|
|  |  |  | WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, | 
|---|