From bfae9780f75db7495f53511f3116bb6c0470a0b0 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 28 六月 2023 20:32:48 +0800 Subject: [PATCH] 增加流关闭时的处理 --- src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java | 98 +++++++++++++++++++++++++++++++++++------------- 1 files changed, 71 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java index 4e73578..f78b692 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java @@ -1,20 +1,34 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; /** * 鎺ユ敹redis鍙戦�佺殑缁撴潫鎺ㄦ祦璇锋眰 @@ -25,11 +39,26 @@ private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class); - private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>(); - - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private IStreamPushService streamPushService; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IVideoManagerStorage storager; + + @Autowired + private ISIPCommanderForPlatform commanderFroPlatform; + + @Autowired + private UserSetting userSetting; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>(); @@ -40,30 +69,45 @@ @Override public void onMessage(Message message, byte[] bytes) { - logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛� {}", new String(message.getBody())); - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(message); - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - Message msg = taskQueue.poll(); - try { - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ - logger.info("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉]锛氬弬鏁颁笉鍏�"); - continue; + logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫]锛� {}", new String(message.getBody())); + MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class); + StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream()); + if (push != null) { + if (redisCatchStorage.isChannelSendingRTP(push.getGbId())) { + List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChnnelId( + push.getGbId()); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + // 鍋滄鍚戜笂绾ф帹娴� + String streamId = sendRtpItem.getStreamId(); + Map<String, Object> param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",streamId); + param.put("ssrc",sendRtpItem.getSsrc()); + logger.info("[REDIS娑堟伅-鎺ㄦ祦缁撴潫] 鍋滄鍚戜笂绾ф帹娴侊細{}", streamId); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStreamId()); + zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + + try { + commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍙戦�丅YE: {}", e.getMessage()); } - // 鏌ョ湅姝e湪绛夊緟鐨刬nvite娑堟伅 - if (responseEvents.get(response.getApp() + response.getStream()) != null) { - responseEvents.get(response.getApp() + response.getStream()).run(response); + if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) { + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), parentPlatform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId()); + messageForPushChannel.setPlatFormIndex(parentPlatform.getId()); + redisCatchStorage.sendPlatformStopPlayMsg(messageForPushChannel); } - }catch (Exception e) { - logger.warn("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", JSON.toJSONString(message)); - logger.error("[REDIS娑堟伅-璇锋眰鎺ㄦ祦缁撴灉] 寮傚父鍐呭锛� ", e); } } - }); + } } + } public void addEvent(String app, String stream, PushStreamResponseEvent callback) { -- Gitblit v1.8.0