lawrencehj
2021-02-05 6868577e329dea7b48b33a2e5d662ad855edb341
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
@@ -23,6 +23,7 @@
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -43,13 +44,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
/**
 * @Description:MESSAGE请求处理器
 * @author: swwheihei
 * @date: 2020年5月3日 下午5:32:41
 */
@SuppressWarnings(value={"unchecked", "rawtypes"})
public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
   public static volatile List<String> threadNameList = new ArrayList();
   private UserSetup userSetup = (UserSetup) SpringBeanFactory.getBean("userSetup");
@@ -240,10 +244,10 @@
      try {
         Element rootElement = getRootElement(evt);
         String deviceId = XmlUtil.getText(rootElement, "DeviceID");
         String result = XmlUtil.getText(rootElement, "Result");
         //String result = XmlUtil.getText(rootElement, "Result");
         // 回复200 OK
         responseAck(evt);
         if (!XmlUtil.isEmpty(result)) {
         if (rootElement.getName().equals("Response")) {//} !XmlUtil.isEmpty(result)) {
            // 此处是对本平台发出DeviceControl指令的应答
            JSONObject json = new JSONObject();
            XmlUtil.node2Json(rootElement, json);
@@ -272,10 +276,9 @@
      try {
         Element rootElement = getRootElement(evt);
         String deviceId = XmlUtil.getText(rootElement, "DeviceID");
         String result = XmlUtil.getText(rootElement, "Result");
         // 回复200 OK
         responseAck(evt);
         //if (!XmlUtil.isEmpty(result)) {
         if (rootElement.getName().equals("Response")) {
            // 此处是对本平台发出DeviceControl指令的应答
            JSONObject json = new JSONObject();
            XmlUtil.node2Json(rootElement, json);
@@ -287,9 +290,9 @@
            msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG);
            msg.setData(json);
            deferredResultHolder.invokeResult(msg);
         // } else {
         //    // 此处是上级发出的DeviceConfig指令
         //}
         } else {
            // 此处是上级发出的DeviceConfig指令
         }
      } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
         e.printStackTrace();
      }
@@ -304,10 +307,9 @@
      try {
         Element rootElement = getRootElement(evt);
         String deviceId = XmlUtil.getText(rootElement, "DeviceID");
         String result = XmlUtil.getText(rootElement, "Result");
         // 回复200 OK
         responseAck(evt);
         //if (!XmlUtil.isEmpty(result)) {
         if (rootElement.getName().equals("Response")) {
            // 此处是对本平台发出DeviceControl指令的应答
            JSONObject json = new JSONObject();
            XmlUtil.node2Json(rootElement, json);
@@ -319,9 +321,9 @@
            msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD);
            msg.setData(json);
            deferredResultHolder.invokeResult(msg);
         // } else {
         //    // 此处是上级发出的DeviceConfig指令
         //}
         } else {
            // 此处是上级发出的DeviceConfig指令
         }
      } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
         e.printStackTrace();
      }
@@ -336,7 +338,6 @@
      try {
         Element rootElement = getRootElement(evt);
         String deviceId = XmlUtil.getText(rootElement, "DeviceID");
         String result = XmlUtil.getText(rootElement, "Result");
         // 回复200 OK
         responseAck(evt);
         if (rootElement.getName().equals("Response")) {//   !XmlUtil.isEmpty(result)) {
@@ -648,8 +649,11 @@
         Element recordListElement = rootElement.element("RecordList");
         if (recordListElement == null || recordInfo.getSumNum() == 0) {
            logger.info("无录像数据");
            // responseAck(evt);
            // return;
            RequestMessage msg = new RequestMessage();
            msg.setDeviceId(deviceId);
            msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
            msg.setData(recordInfo);
            deferredResultHolder.invokeResult(msg);
         } else {
            Iterator<Element> recordListIterator = recordListElement.elementIterator();
            List<RecordItem> recordList = new ArrayList<RecordItem>();
@@ -679,44 +683,63 @@
                  record.setRecorderId(XmlUtil.getText(itemRecord, "RecorderID"));
                  recordList.add(record);
               }
               // recordList.sort(Comparator.naturalOrder());
               recordInfo.setRecordList(recordList);
            }
            // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
            if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {
               // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
            // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题
               String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
               redis.set(cacheKey + "_" + uuid, recordList, 90);
               List<Object> cacheKeys = redis.scan(cacheKey + "_*");
               List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
               for (int i = 0; i < cacheKeys.size(); i++) {
                  totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
            if (!threadNameList.contains(cacheKey)) {
               threadNameList.add(cacheKey);
               CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo);
               chk.setName(cacheKey);
               chk.setDeferredResultHolder(deferredResultHolder);
               chk.setRedis(redis);
               chk.setLogger(logger);
               chk.start();
               if (logger.isDebugEnabled()) {
                  logger.debug("Start Thread " + cacheKey + ".");
               }
               if (totalRecordList.size() < recordInfo.getSumNum()) {
                  logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");
                  return;
               }
               logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");
               recordInfo.setRecordList(totalRecordList);
               for (int i = 0; i < cacheKeys.size(); i++) {
                  redis.del(cacheKeys.get(i).toString());
            } else {
               if (logger.isDebugEnabled()) {
                  logger.debug("Thread " + cacheKey + " already started.");
               }
            }
            // 自然顺序排序, 元素进行升序排列
            recordInfo.getRecordList().sort(Comparator.naturalOrder());
            // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
            // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {
            //    // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
            //    String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
            //    redis.set(cacheKey + "_" + uuid, recordList, 90);
            //    List<Object> cacheKeys = redis.scan(cacheKey + "_*");
            //    List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
            //    for (int i = 0; i < cacheKeys.size(); i++) {
            //       totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
            //    }
            //    if (totalRecordList.size() < recordInfo.getSumNum()) {
            //       logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");
            //       return;
            //    }
            //    logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");
            //    recordInfo.setRecordList(totalRecordList);
            //    for (int i = 0; i < cacheKeys.size(); i++) {
            //       redis.del(cacheKeys.get(i).toString());
            //    }
            // }
            // // 自然顺序排序, 元素进行升序排列
            // recordInfo.getRecordList().sort(Comparator.naturalOrder());
         }
         // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作
         // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作
         // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据
         RequestMessage msg = new RequestMessage();
         msg.setDeviceId(deviceId);
         msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
         msg.setData(recordInfo);
         deferredResultHolder.invokeResult(msg);
         logger.info("处理完成,返回结果");
         // RequestMessage msg = new RequestMessage();
         // msg.setDeviceId(deviceId);
         // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
         // msg.setData(recordInfo);
         // deferredResultHolder.invokeResult(msg);
         // logger.info("处理完成,返回结果");
      } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
         e.printStackTrace();
      }