From 6868577e329dea7b48b33a2e5d662ad855edb341 Mon Sep 17 00:00:00 2001 From: lawrencehj <1934378145@qq.com> Date: 星期五, 05 二月 2021 15:00:47 +0800 Subject: [PATCH] 将录像文件统计改为独立线程进行,实现超时返回已接收结果 --- src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java | 115 ++++++++++++++++++++++++++++++++++----------------------- 1 files changed, 69 insertions(+), 46 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index c865ad3..e7fbfe0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -43,13 +44,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; + /** * @Description:MESSAGE璇锋眰澶勭悊鍣� * @author: swwheihei * @date: 2020骞�5鏈�3鏃� 涓嬪崍5:32:41 */ - +@SuppressWarnings(value={"unchecked", "rawtypes"}) public class MessageRequestProcessor extends SIPRequestAbstractProcessor { + + public static volatile List<String> threadNameList = new ArrayList(); private UserSetup userSetup = (UserSetup) SpringBeanFactory.getBean("userSetup"); @@ -240,10 +244,10 @@ try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); + //String result = XmlUtil.getText(rootElement, "Result"); // 鍥炲200 OK responseAck(evt); - if (!XmlUtil.isEmpty(result)) { + if (rootElement.getName().equals("Response")) {//} !XmlUtil.isEmpty(result)) { // 姝ゅ鏄鏈钩鍙板彂鍑篋eviceControl鎸囦护鐨勫簲绛� JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); @@ -272,11 +276,10 @@ try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); // 鍥炲200 OK responseAck(evt); - //if (!XmlUtil.isEmpty(result)) { - // 姝ゅ鏄鏈钩鍙板彂鍑篋eviceControl鎸囦护鐨勫簲绛� + if (rootElement.getName().equals("Response")) { + // 姝ゅ鏄鏈钩鍙板彂鍑篋eviceControl鎸囦护鐨勫簲绛� JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { @@ -287,9 +290,9 @@ msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG); msg.setData(json); deferredResultHolder.invokeResult(msg); - // } else { - // // 姝ゅ鏄笂绾у彂鍑虹殑DeviceConfig鎸囦护 - //} + } else { + // 姝ゅ鏄笂绾у彂鍑虹殑DeviceConfig鎸囦护 + } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } @@ -304,11 +307,10 @@ try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); // 鍥炲200 OK responseAck(evt); - //if (!XmlUtil.isEmpty(result)) { - // 姝ゅ鏄鏈钩鍙板彂鍑篋eviceControl鎸囦护鐨勫簲绛� + if (rootElement.getName().equals("Response")) { + // 姝ゅ鏄鏈钩鍙板彂鍑篋eviceControl鎸囦护鐨勫簲绛� JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { @@ -319,9 +321,9 @@ msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD); msg.setData(json); deferredResultHolder.invokeResult(msg); - // } else { - // // 姝ゅ鏄笂绾у彂鍑虹殑DeviceConfig鎸囦护 - //} + } else { + // 姝ゅ鏄笂绾у彂鍑虹殑DeviceConfig鎸囦护 + } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } @@ -336,7 +338,6 @@ try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); // 鍥炲200 OK responseAck(evt); if (rootElement.getName().equals("Response")) {// !XmlUtil.isEmpty(result)) { @@ -648,8 +649,11 @@ Element recordListElement = rootElement.element("RecordList"); if (recordListElement == null || recordInfo.getSumNum() == 0) { logger.info("鏃犲綍鍍忔暟鎹�"); - // responseAck(evt); - // return; + RequestMessage msg = new RequestMessage(); + msg.setDeviceId(deviceId); + msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); + msg.setData(recordInfo); + deferredResultHolder.invokeResult(msg); } else { Iterator<Element> recordListIterator = recordListElement.elementIterator(); List<RecordItem> recordList = new ArrayList<RecordItem>(); @@ -679,44 +683,63 @@ record.setRecorderId(XmlUtil.getText(itemRecord, "RecorderID")); recordList.add(record); } - // recordList.sort(Comparator.naturalOrder()); recordInfo.setRecordList(recordList); } - // 瀛樺湪褰曞儚涓斿鏋滃綋鍓嶅綍鍍忔槑缁嗕釜鏁板皬浜庢�绘潯鏁帮紝璇存槑鎷嗗寘杩斿洖锛岄渶瑕佺粍瑁咃紝鏆備笉杩斿洖 - if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) { - // 涓洪槻姝㈣繛缁姹傝璁惧鐨勫綍鍍忔暟鎹紝杩斿洖鏁版嵁閿欎贡锛岀壒澧炲姞sn杩涜鍖哄垎 - String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; - - redis.set(cacheKey + "_" + uuid, recordList, 90); - List<Object> cacheKeys = redis.scan(cacheKey + "_*"); - List<RecordItem> totalRecordList = new ArrayList<RecordItem>(); - for (int i = 0; i < cacheKeys.size(); i++) { - totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString())); + // 鏀圭敤鍗曠嫭绾跨▼缁熻宸茶幏鍙栧綍鍍忔枃浠舵暟閲忥紝閬垮厤澶氬寘骞惰鍒嗗埆缁熻涓嶅畬鏁寸殑闂 + String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; + redis.set(cacheKey + "_" + uuid, recordList, 90); + if (!threadNameList.contains(cacheKey)) { + threadNameList.add(cacheKey); + CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo); + chk.setName(cacheKey); + chk.setDeferredResultHolder(deferredResultHolder); + chk.setRedis(redis); + chk.setLogger(logger); + chk.start(); + if (logger.isDebugEnabled()) { + logger.debug("Start Thread " + cacheKey + "."); } - if (totalRecordList.size() < recordInfo.getSumNum()) { - logger.info("宸茶幏鍙�" + totalRecordList.size() + "椤瑰綍鍍忔暟鎹紝鍏�" + recordInfo.getSumNum() + "椤�"); - return; - } - logger.info("褰曞儚鏁版嵁宸插叏閮ㄨ幏鍙栵紝鍏�" + recordInfo.getSumNum() + "椤�"); - recordInfo.setRecordList(totalRecordList); - for (int i = 0; i < cacheKeys.size(); i++) { - redis.del(cacheKeys.get(i).toString()); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Thread " + cacheKey + " already started."); } } - // 鑷劧椤哄簭鎺掑簭, 鍏冪礌杩涜鍗囧簭鎺掑垪 - recordInfo.getRecordList().sort(Comparator.naturalOrder()); + + // 瀛樺湪褰曞儚涓斿鏋滃綋鍓嶅綍鍍忔槑缁嗕釜鏁板皬浜庢�绘潯鏁帮紝璇存槑鎷嗗寘杩斿洖锛岄渶瑕佺粍瑁咃紝鏆備笉杩斿洖 + // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) { + // // 涓洪槻姝㈣繛缁姹傝璁惧鐨勫綍鍍忔暟鎹紝杩斿洖鏁版嵁閿欎贡锛岀壒澧炲姞sn杩涜鍖哄垎 + // String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; + + // redis.set(cacheKey + "_" + uuid, recordList, 90); + // List<Object> cacheKeys = redis.scan(cacheKey + "_*"); + // List<RecordItem> totalRecordList = new ArrayList<RecordItem>(); + // for (int i = 0; i < cacheKeys.size(); i++) { + // totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString())); + // } + // if (totalRecordList.size() < recordInfo.getSumNum()) { + // logger.info("宸茶幏鍙�" + totalRecordList.size() + "椤瑰綍鍍忔暟鎹紝鍏�" + recordInfo.getSumNum() + "椤�"); + // return; + // } + // logger.info("褰曞儚鏁版嵁宸插叏閮ㄨ幏鍙栵紝鍏�" + recordInfo.getSumNum() + "椤�"); + // recordInfo.setRecordList(totalRecordList); + // for (int i = 0; i < cacheKeys.size(); i++) { + // redis.del(cacheKeys.get(i).toString()); + // } + // } + // // 鑷劧椤哄簭鎺掑簭, 鍏冪礌杩涜鍗囧簭鎺掑垪 + // recordInfo.getRecordList().sort(Comparator.naturalOrder()); } // 璧板埌杩欓噷锛屾湁浠ヤ笅鍙兘锛�1銆佹病鏈夊綍鍍忎俊鎭�,绗竴娆℃敹鍒皉ecordinfo鐨勬秷鎭嵆杩斿洖鍝嶅簲鏁版嵁锛屾棤redis鎿嶄綔 // 2銆佹湁褰曞儚鏁版嵁锛屼笖绗竴娆″嵆鏀跺埌瀹屾暣鏁版嵁锛岃繑鍥炲搷搴旀暟鎹紝鏃爎edis鎿嶄綔 // 3銆佹湁褰曞儚鏁版嵁锛屽湪瓒呮椂鏃堕棿鍐呮敹鍒板娆″寘缁勮鍚庢暟閲忚冻澶燂紝杩斿洖鏁版嵁 - RequestMessage msg = new RequestMessage(); - msg.setDeviceId(deviceId); - msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); - msg.setData(recordInfo); - deferredResultHolder.invokeResult(msg); - logger.info("澶勭悊瀹屾垚锛岃繑鍥炵粨鏋�"); + // RequestMessage msg = new RequestMessage(); + // msg.setDeviceId(deviceId); + // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); + // msg.setData(recordInfo); + // deferredResultHolder.invokeResult(msg); + // logger.info("澶勭悊瀹屾垚锛岃繑鍥炵粨鏋�"); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } @@ -799,4 +822,4 @@ public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } -} +} \ No newline at end of file -- Gitblit v1.8.0