old mode 100644
new mode 100755
|  |  |  | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.gb28181.utils.SipUtils; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.service.IDeviceChannelService; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.storager.IRedisCatchStorage; | 
|---|
|  |  |  | import com.genersoft.iot.vmp.storager.IVideoManagerStorage; | 
|---|
|  |  |  | 
|---|
|  |  |  | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | 
|---|
|  |  |  | import org.springframework.stereotype.Component; | 
|---|
|  |  |  | import org.springframework.util.ObjectUtils; | 
|---|
|  |  |  | import org.springframework.util.StringUtils; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import javax.sip.InvalidArgumentException; | 
|---|
|  |  |  | import javax.sip.RequestEvent; | 
|---|
|  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private IDeviceChannelService deviceChannelService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private boolean taskQueueHandlerRun = false; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Qualifier("taskExecutor") | 
|---|
|  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void handForDevice(RequestEvent evt, Device device, Element rootElement) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | boolean isEmpty = taskQueue.isEmpty(); | 
|---|
|  |  |  | taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); | 
|---|
|  |  |  | if (!taskQueueHandlerRun) { | 
|---|
|  |  |  | taskQueueHandlerRun = true; | 
|---|
|  |  |  | // 回复200 OK | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | responseAck((SIPRequest) evt.getRequest(), Response.OK); | 
|---|
|  |  |  | } catch (SipException | InvalidArgumentException | ParseException e) { | 
|---|
|  |  |  | logger.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | if (isEmpty) { | 
|---|
|  |  |  | taskExecutor.execute(() -> { | 
|---|
|  |  |  | while (!taskQueue.isEmpty()) { | 
|---|
|  |  |  | SipMsgInfo sipMsgInfo = taskQueue.poll(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); | 
|---|
|  |  |  | if (rootElementAfterCharset == null) { | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); | 
|---|
|  |  |  | responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.BAD_REQUEST); | 
|---|
|  |  |  | } catch (SipException | InvalidArgumentException | ParseException e) { | 
|---|
|  |  |  | logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | logger.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); | 
|---|
|  |  |  | continue; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | MobilePosition mobilePosition = new MobilePosition(); | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); | 
|---|
|  |  |  | mobilePosition.setChannelId(getText(rootElementAfterCharset, "DeviceID")); | 
|---|
|  |  |  | mobilePosition.setTime(getText(rootElementAfterCharset, "Time")); | 
|---|
|  |  |  | String time = getText(rootElementAfterCharset, "Time"); | 
|---|
|  |  |  | if (ObjectUtils.isEmpty(time)){ | 
|---|
|  |  |  | mobilePosition.setTime(DateUtil.getNow()); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | mobilePosition.setTime(SipUtils.parseTime(time)); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); | 
|---|
|  |  |  | mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); | 
|---|
|  |  |  | if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { | 
|---|
|  |  |  | 
|---|
|  |  |  | storager.insertMobilePosition(mobilePosition); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | storager.updateChannelPosition(deviceChannel); | 
|---|
|  |  |  | //回复 200 OK | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK); | 
|---|
|  |  |  | } catch (SipException | InvalidArgumentException | ParseException e) { | 
|---|
|  |  |  | logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // 发送redis消息。 通知位置信息的变化 | 
|---|
|  |  |  | JSONObject jsonObject = new JSONObject(); | 
|---|
|  |  |  | jsonObject.put("time", mobilePosition.getTime()); | 
|---|
|  |  |  | jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); | 
|---|
|  |  |  | jsonObject.put("serial", deviceChannel.getDeviceId()); | 
|---|
|  |  |  | jsonObject.put("code", deviceChannel.getChannelId()); | 
|---|
|  |  |  | jsonObject.put("longitude", mobilePosition.getLongitude()); | 
|---|
|  |  |  | 
|---|
|  |  |  | redisCatchStorage.sendMobilePositionMsg(jsonObject); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } catch (DocumentException e) { | 
|---|
|  |  |  | e.printStackTrace(); | 
|---|
|  |  |  | logger.error("未处理的异常 ", e); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | logger.warn("[移动位置通知] 发现未处理的异常, \r\n{}", evt.getRequest()); | 
|---|
|  |  |  | logger.error("[移动位置通知] 异常内容: ", e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | taskQueueHandlerRun = false; | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|