From cde7f6460a563a4e9b2624b395d9bdfe6f90e14c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期五, 01 九月 2023 09:21:05 +0800 Subject: [PATCH] Merge pull request #999 from a24211317/wvp-28181-2.0 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java | 236 +++++++++++++++++++++++++++++++++------------------------- 1 files changed, 133 insertions(+), 103 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java old mode 100644 new mode 100755 index f1919da..36e5df2 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -1,53 +1,67 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; -import com.genersoft.iot.vmp.gb28181.bean.RecordItem; -import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.DateUtil; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; -import org.dom4j.DocumentException; +import com.genersoft.iot.vmp.utils.DateUtil; +import com.genersoft.iot.vmp.utils.UJson; +import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; +/** + * @author lin + */ @Component public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); - public static volatile List<String> threadNameList = new ArrayList(); + private final Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); private final String cmdType = "RecordInfo"; - private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; + + private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private ResponseMessageHandler responseMessageHandler; @Autowired - private RedisUtil redis; + private DeferredResultHolder deferredResultHolder; @Autowired - private DeferredResultHolder deferredResultHolder; + private EventPublisher eventPublisher; + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Autowired + private RedisTemplate<Object, Object> redisTemplate; + + private Long recordInfoTtl = 1800L; @Override public void afterPropertiesSet() throws Exception { @@ -56,98 +70,114 @@ @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - - // 鍥炲200 OK try { - responseAck(evt, Response.OK); - - rootElement = getRootElement(evt, device.getCharset()); - String uuid = UUID.randomUUID().toString().replace("-", ""); - RecordInfo recordInfo = new RecordInfo(); - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText(); - String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + channelId; - recordInfo.setDeviceId(device.getDeviceId()); - recordInfo.setChannelId(channelId); - recordInfo.setName(getText(rootElement, "Name")); - if (getText(rootElement, "SumNum") == null || getText(rootElement, "SumNum") == "") { - recordInfo.setSumNum(0); - } else { - recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum"))); - } - String sn = getText(rootElement, "SN"); - Element recordListElement = rootElement.element("RecordList"); - if (recordListElement == null || recordInfo.getSumNum() == 0) { - logger.info("鏃犲綍鍍忔暟鎹�"); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(recordInfo); - deferredResultHolder.invokeAllResult(msg); - } else { - Iterator<Element> recordListIterator = recordListElement.elementIterator(); - List<RecordItem> recordList = new ArrayList<RecordItem>(); - if (recordListIterator != null) { - RecordItem record = new RecordItem(); - logger.info("澶勭悊褰曞儚鍒楄〃鏁版嵁..."); - // 閬嶅巻DeviceList - while (recordListIterator.hasNext()) { - Element itemRecord = recordListIterator.next(); - Element recordElement = itemRecord.element("DeviceID"); - if (recordElement == null) { - logger.info("璁板綍涓虹┖锛屼笅涓�涓�..."); - continue; - } - record = new RecordItem(); - record.setDeviceId(getText(itemRecord, "DeviceID")); - record.setName(getText(itemRecord, "Name")); - record.setFilePath(getText(itemRecord, "FilePath")); - record.setAddress(getText(itemRecord, "Address")); - record.setStartTime( - DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime"))); - record.setEndTime( - DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "EndTime"))); - record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 - : Integer.parseInt(getText(itemRecord, "Secrecy"))); - record.setType(getText(itemRecord, "Type")); - record.setRecorderId(getText(itemRecord, "RecorderID")); - recordList.add(record); - } - recordInfo.setRecordList(recordList); - } - - // 鏀圭敤鍗曠嫭绾跨▼缁熻宸茶幏鍙栧綍鍍忔枃浠舵暟閲忥紝閬垮厤澶氬寘骞惰鍒嗗埆缁熻涓嶅畬鏁寸殑闂 - String cacheKey = CACHE_RECORDINFO_KEY + device.getDeviceId() + sn; - redis.set(cacheKey + "_" + uuid, recordList, 90); - if (!threadNameList.contains(cacheKey)) { - threadNameList.add(cacheKey); - CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo); - chk.setName(cacheKey); - chk.setDeferredResultHolder(deferredResultHolder); - chk.setRedis(redis); - chk.setLogger(logger); - chk.start(); - if (logger.isDebugEnabled()) { - logger.debug("Start Thread " + cacheKey + "."); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("Thread " + cacheKey + " already started."); - } - } - } - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } catch (DocumentException e) { - e.printStackTrace(); + // 鍥炲200 OK + responseAck((SIPRequest) evt.getRequest(), Response.OK); + }catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[鍛戒护鍙戦�佸け璐 鍥芥爣绾ц仈 鍥芥爣褰曞儚: {}", e.getMessage()); } + taskExecutor.execute(()->{ + try { + + String sn = getText(rootElement, "SN"); + String channelId = getText(rootElement, "DeviceID"); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setChannelId(channelId); + recordInfo.setDeviceId(device.getDeviceId()); + recordInfo.setSn(sn); + recordInfo.setName(getText(rootElement, "Name")); + String sumNumStr = getText(rootElement, "SumNum"); + int sumNum = 0; + if (!ObjectUtils.isEmpty(sumNumStr)) { + sumNum = Integer.parseInt(sumNumStr); + } + recordInfo.setSumNum(sumNum); + Element recordListElement = rootElement.element("RecordList"); + if (recordListElement == null || sumNum == 0) { + logger.info("鏃犲綍鍍忔暟鎹�"); + recordInfo.setCount(sumNum); + eventPublisher.recordEndEventPush(recordInfo); + releaseRequest(device.getDeviceId(), sn,recordInfo); + } else { + Iterator<Element> recordListIterator = recordListElement.elementIterator(); + if (recordListIterator != null) { + List<RecordItem> recordList = new ArrayList<>(); + // 閬嶅巻DeviceList + while (recordListIterator.hasNext()) { + Element itemRecord = recordListIterator.next(); + Element recordElement = itemRecord.element("DeviceID"); + if (recordElement == null) { + logger.info("璁板綍涓虹┖锛屼笅涓�涓�..."); + continue; + } + RecordItem record = new RecordItem(); + record.setDeviceId(getText(itemRecord, "DeviceID")); + record.setName(getText(itemRecord, "Name")); + record.setFilePath(getText(itemRecord, "FilePath")); + record.setFileSize(getText(itemRecord, "FileSize")); + record.setAddress(getText(itemRecord, "Address")); + + String startTimeStr = getText(itemRecord, "StartTime"); + record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); + + String endTimeStr = getText(itemRecord, "EndTime"); + record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); + + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 + : Integer.parseInt(getText(itemRecord, "Secrecy"))); + record.setType(getText(itemRecord, "Type")); + record.setRecorderId(getText(itemRecord, "RecorderID")); + recordList.add(record); + } + Map<String, String> map = recordList.stream() + .filter(record -> record.getDeviceId() != null) + .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); + // 鑾峰彇浠诲姟缁撴灉鏁版嵁 + String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; + redisTemplate.opsForHash().putAll(resKey, map); + redisTemplate.expire(resKey, recordInfoTtl, TimeUnit.SECONDS); + String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; + long incr = redisTemplate.opsForValue().increment(resCountKey, map.size()); + redisTemplate.expire(resCountKey, recordInfoTtl, TimeUnit.SECONDS); + recordInfo.setRecordList(recordList); + recordInfo.setCount(Math.toIntExact(incr)); + eventPublisher.recordEndEventPush(recordInfo); + if (incr < sumNum) { + return; + } + // 宸叉帴鏀跺畬鎴� + List<RecordItem> resList = redisTemplate.opsForHash().entries(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); + if (resList.size() < sumNum) { + return; + } + recordInfo.setRecordList(resList); + releaseRequest(device.getDeviceId(), sn,recordInfo); + } + } + } catch (Exception e) { + logger.error("[鍥芥爣褰曞儚] 鍙戠幇鏈鐞嗙殑寮傚父, \r\n{}", evt.getRequest()); + logger.error("[鍥芥爣褰曞儚] 寮傚父鍐呭锛� ", e); + } + }); } @Override public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element element) { } + + public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){ + String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; + // 瀵规暟鎹繘琛屾帓搴� + if(recordInfo!=null && recordInfo.getRecordList()!=null) { + Collections.sort(recordInfo.getRecordList()); + }else{ + recordInfo.setRecordList(new ArrayList<>()); + } + + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(recordInfo); + deferredResultHolder.invokeAllResult(msg); + } } -- Gitblit v1.8.0