| | |
| | | package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; |
| | | |
| | | import com.genersoft.iot.vmp.gb28181.bean.Device; |
| | | import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
| | | import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; |
| | | import com.genersoft.iot.vmp.gb28181.bean.RecordItem; |
| | | import com.genersoft.iot.vmp.common.VideoManagerConstants; |
| | | import com.genersoft.iot.vmp.gb28181.bean.*; |
| | | import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; |
| | | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; |
| | | import com.genersoft.iot.vmp.gb28181.utils.DateUtil; |
| | | import com.genersoft.iot.vmp.utils.DateUtil; |
| | | import com.genersoft.iot.vmp.utils.UJson; |
| | | import com.genersoft.iot.vmp.utils.redis.RedisUtil; |
| | | import org.dom4j.DocumentException; |
| | | import gov.nist.javax.sip.message.SIPRequest; |
| | | import org.dom4j.Element; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.InitializingBean; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Qualifier; |
| | | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import javax.sip.InvalidArgumentException; |
| | | import javax.sip.RequestEvent; |
| | | import javax.sip.SipException; |
| | | import javax.sip.message.Response; |
| | | import java.text.ParseException; |
| | | import java.util.ArrayList; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.UUID; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.stream.Collectors; |
| | | |
| | | import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; |
| | | |
| | | /** |
| | | * @author lin |
| | | */ |
| | | @Component |
| | | public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { |
| | | |
| | | private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); |
| | | public static volatile List<String> threadNameList = new ArrayList(); |
| | | private final Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); |
| | | private final String cmdType = "RecordInfo"; |
| | | private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; |
| | | |
| | | private ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>(); |
| | | |
| | | @Autowired |
| | | private ResponseMessageHandler responseMessageHandler; |
| | | |
| | | @Autowired |
| | | private RedisUtil redis; |
| | | |
| | | @Autowired |
| | | private DeferredResultHolder deferredResultHolder; |
| | | |
| | | @Autowired |
| | | private EventPublisher eventPublisher; |
| | | |
| | | @Qualifier("taskExecutor") |
| | | @Autowired |
| | | private ThreadPoolTaskExecutor taskExecutor; |
| | | |
| | | private Long recordInfoTtl = 1800L; |
| | | |
| | | @Override |
| | | public void afterPropertiesSet() throws Exception { |
| | |
| | | |
| | | @Override |
| | | public void handForDevice(RequestEvent evt, Device device, Element rootElement) { |
| | | |
| | | // 回复200 OK |
| | | try { |
| | | responseAck(evt, Response.OK); |
| | | |
| | | rootElement = getRootElement(evt, device.getCharset()); |
| | | String uuid = UUID.randomUUID().toString().replace("-", ""); |
| | | RecordInfo recordInfo = new RecordInfo(); |
| | | String sn = getText(rootElement, "SN"); |
| | | String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + sn; |
| | | recordInfo.setDeviceId(device.getDeviceId()); |
| | | recordInfo.setSn(sn); |
| | | recordInfo.setName(getText(rootElement, "Name")); |
| | | if (getText(rootElement, "SumNum") == null || getText(rootElement, "SumNum") == "") { |
| | | recordInfo.setSumNum(0); |
| | | } else { |
| | | recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum"))); |
| | | } |
| | | Element recordListElement = rootElement.element("RecordList"); |
| | | if (recordListElement == null || recordInfo.getSumNum() == 0) { |
| | | logger.info("无录像数据"); |
| | | eventPublisher.recordEndEventPush(recordInfo); |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setKey(key); |
| | | msg.setData(recordInfo); |
| | | deferredResultHolder.invokeAllResult(msg); |
| | | } else { |
| | | Iterator<Element> recordListIterator = recordListElement.elementIterator(); |
| | | List<RecordItem> recordList = new ArrayList<RecordItem>(); |
| | | if (recordListIterator != null) { |
| | | RecordItem record = new RecordItem(); |
| | | logger.info("处理录像列表数据..."); |
| | | // 遍历DeviceList |
| | | while (recordListIterator.hasNext()) { |
| | | Element itemRecord = recordListIterator.next(); |
| | | Element recordElement = itemRecord.element("DeviceID"); |
| | | if (recordElement == null) { |
| | | logger.info("记录为空,下一个..."); |
| | | continue; |
| | | } |
| | | record = new RecordItem(); |
| | | record.setDeviceId(getText(itemRecord, "DeviceID")); |
| | | record.setName(getText(itemRecord, "Name")); |
| | | record.setFilePath(getText(itemRecord, "FilePath")); |
| | | record.setFileSize(getText(itemRecord, "FileSize")); |
| | | record.setAddress(getText(itemRecord, "Address")); |
| | | record.setStartTime( |
| | | DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime"))); |
| | | record.setEndTime( |
| | | DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "EndTime"))); |
| | | record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 |
| | | : Integer.parseInt(getText(itemRecord, "Secrecy"))); |
| | | record.setType(getText(itemRecord, "Type")); |
| | | record.setRecorderId(getText(itemRecord, "RecorderID")); |
| | | recordList.add(record); |
| | | } |
| | | recordInfo.setRecordList(recordList); |
| | | } |
| | | eventPublisher.recordEndEventPush(recordInfo); |
| | | // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题 |
| | | String cacheKey = CACHE_RECORDINFO_KEY + device.getDeviceId() + sn; |
| | | redis.set(cacheKey + "_" + uuid, recordList, 90); |
| | | if (!threadNameList.contains(cacheKey)) { |
| | | threadNameList.add(cacheKey); |
| | | CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo); |
| | | chk.setName(cacheKey); |
| | | chk.setDeferredResultHolder(deferredResultHolder); |
| | | chk.setRedis(redis); |
| | | chk.setLogger(logger); |
| | | chk.start(); |
| | | if (logger.isDebugEnabled()) { |
| | | logger.debug("Start Thread " + cacheKey + "."); |
| | | } |
| | | } else { |
| | | if (logger.isDebugEnabled()) { |
| | | logger.debug("Thread " + cacheKey + " already started."); |
| | | } |
| | | } |
| | | } |
| | | } catch (SipException e) { |
| | | e.printStackTrace(); |
| | | } catch (InvalidArgumentException e) { |
| | | e.printStackTrace(); |
| | | } catch (ParseException e) { |
| | | e.printStackTrace(); |
| | | } catch (DocumentException e) { |
| | | e.printStackTrace(); |
| | | // 回复200 OK |
| | | responseAck((SIPRequest) evt.getRequest(), Response.OK); |
| | | }catch (SipException | InvalidArgumentException | ParseException e) { |
| | | logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage()); |
| | | } |
| | | taskExecutor.execute(()->{ |
| | | try { |
| | | |
| | | String sn = getText(rootElement, "SN"); |
| | | String channelId = getText(rootElement, "DeviceID"); |
| | | RecordInfo recordInfo = new RecordInfo(); |
| | | recordInfo.setChannelId(channelId); |
| | | recordInfo.setDeviceId(device.getDeviceId()); |
| | | recordInfo.setSn(sn); |
| | | recordInfo.setName(getText(rootElement, "Name")); |
| | | String sumNumStr = getText(rootElement, "SumNum"); |
| | | int sumNum = 0; |
| | | if (!ObjectUtils.isEmpty(sumNumStr)) { |
| | | sumNum = Integer.parseInt(sumNumStr); |
| | | } |
| | | recordInfo.setSumNum(sumNum); |
| | | Element recordListElement = rootElement.element("RecordList"); |
| | | if (recordListElement == null || sumNum == 0) { |
| | | logger.info("无录像数据"); |
| | | recordInfo.setCount(sumNum); |
| | | eventPublisher.recordEndEventPush(recordInfo); |
| | | releaseRequest(device.getDeviceId(), sn,recordInfo); |
| | | } else { |
| | | Iterator<Element> recordListIterator = recordListElement.elementIterator(); |
| | | if (recordListIterator != null) { |
| | | List<RecordItem> recordList = new ArrayList<>(); |
| | | // 遍历DeviceList |
| | | while (recordListIterator.hasNext()) { |
| | | Element itemRecord = recordListIterator.next(); |
| | | Element recordElement = itemRecord.element("DeviceID"); |
| | | if (recordElement == null) { |
| | | logger.info("记录为空,下一个..."); |
| | | continue; |
| | | } |
| | | RecordItem record = new RecordItem(); |
| | | record.setDeviceId(getText(itemRecord, "DeviceID")); |
| | | record.setName(getText(itemRecord, "Name")); |
| | | record.setFilePath(getText(itemRecord, "FilePath")); |
| | | record.setFileSize(getText(itemRecord, "FileSize")); |
| | | record.setAddress(getText(itemRecord, "Address")); |
| | | |
| | | String startTimeStr = getText(itemRecord, "StartTime"); |
| | | record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); |
| | | |
| | | String endTimeStr = getText(itemRecord, "EndTime"); |
| | | record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); |
| | | |
| | | record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 |
| | | : Integer.parseInt(getText(itemRecord, "Secrecy"))); |
| | | record.setType(getText(itemRecord, "Type")); |
| | | record.setRecorderId(getText(itemRecord, "RecorderID")); |
| | | recordList.add(record); |
| | | } |
| | | Map<String, String> map = recordList.stream() |
| | | .filter(record -> record.getDeviceId() != null) |
| | | .collect(Collectors.toMap(record -> record.getStartTime()+ record.getEndTime(), UJson::writeJson)); |
| | | // 获取任务结果数据 |
| | | String resKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_PRE + channelId + sn; |
| | | RedisUtil.hmset(resKey, map, recordInfoTtl); |
| | | String resCountKey = VideoManagerConstants.REDIS_RECORD_INFO_RES_COUNT_PRE + channelId + sn; |
| | | long incr = RedisUtil.incr(resCountKey, map.size()); |
| | | RedisUtil.expire(resCountKey, recordInfoTtl); |
| | | recordInfo.setRecordList(recordList); |
| | | recordInfo.setCount(Math.toIntExact(incr)); |
| | | eventPublisher.recordEndEventPush(recordInfo); |
| | | if (incr < sumNum) { |
| | | return; |
| | | } |
| | | // 已接收完成 |
| | | List<RecordItem> resList = RedisUtil.hmget(resKey).values().stream().map(e -> UJson.readJson(e.toString(), RecordItem.class)).collect(Collectors.toList()); |
| | | if (resList.size() < sumNum) { |
| | | return; |
| | | } |
| | | recordInfo.setRecordList(resList); |
| | | releaseRequest(device.getDeviceId(), sn,recordInfo); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | logger.error("[国标录像] 发现未处理的异常, "+e.getMessage(), e); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | @Override |
| | | public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element element) { |
| | | |
| | | } |
| | | |
| | | public void releaseRequest(String deviceId, String sn,RecordInfo recordInfo){ |
| | | String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; |
| | | // 对数据进行排序 |
| | | Collections.sort(recordInfo.getRecordList()); |
| | | |
| | | RequestMessage msg = new RequestMessage(); |
| | | msg.setKey(key); |
| | | msg.setData(recordInfo); |
| | | deferredResultHolder.invokeAllResult(msg); |
| | | } |
| | | } |