src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -9,25 +8,27 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import java.util.Date; import java.time.Instant; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** * 动态定时任务 * @author lin */ @Component public class DynamicTask { private Logger logger = LoggerFactory.getLogger(DynamicTask.class); private final Logger logger = LoggerFactory.getLogger(DynamicTask.class); @Autowired private ThreadPoolTaskScheduler threadPoolTaskScheduler; private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); private Map<String, Runnable> runnableMap = new ConcurrentHashMap<>(); private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>(); @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { @@ -47,7 +48,7 @@ * @return */ public void startCron(String key, Runnable task, int cycleForCatalog) { ScheduledFuture future = futureMap.get(key); ScheduledFuture<?> future = futureMap.get(key); if (future != null) { if (future.isCancelled()) { logger.debug("任务【{}】已存在但是关闭状态!!!", key); @@ -76,7 +77,9 @@ */ public void startDelay(String key, Runnable task, int delay) { stop(key); Date starTime = new Date(System.currentTimeMillis() + delay); // 获取执行的时刻 Instant startInstant = Instant.now().plusMillis(TimeUnit.MILLISECONDS.toMillis(delay)); ScheduledFuture future = futureMap.get(key); if (future != null) { @@ -88,7 +91,7 @@ } } // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 future = threadPoolTaskScheduler.schedule(task, starTime); future = threadPoolTaskScheduler.schedule(task, startInstant); if (future != null){ futureMap.put(key, future); runnableMap.put(key, task); src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java
@@ -27,8 +27,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.text.DecimalFormat; import java.util.Date; import java.time.Instant; import java.util.Random; import javax.sip.address.URI; @@ -90,17 +89,12 @@ * @return a generated nonce. */ private String generateNonce() { // Get the time of day and run MD5 over it. Date date = new Date(); long time = date.getTime(); long time = Instant.now().toEpochMilli(); Random rand = new Random(); long pad = rand.nextLong(); // String nonceString = (new Long(time)).toString() // + (new Long(pad)).toString(); String nonceString = Long.valueOf(time).toString() + Long.valueOf(pad).toString(); byte mdbytes[] = messageDigest.digest(nonceString.getBytes()); // Convert the mdbytes array into a hex string. return toHexString(mdbytes); } src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
@@ -1,13 +1,13 @@ package com.genersoft.iot.vmp.gb28181.bean; import java.util.Date; import java.time.Instant; import java.util.List; public class CatalogData { private int sn; // 命令序列号 private int total; private List<DeviceChannel> channelList; private Date lastTime; private Instant lastTime; private Device device; private String errorMsg; @@ -41,11 +41,11 @@ this.channelList = channelList; } public Date getLastTime() { public Instant getLastTime() { return lastTime; } public void setLastTime(Date lastTime) { public void setLastTime(Instant lastTime) { this.lastTime = lastTime; } src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java
@@ -1,6 +1,6 @@ package com.genersoft.iot.vmp.gb28181.bean; import java.util.Date; import java.time.Instant; import java.util.List; /** @@ -20,7 +20,7 @@ private int sumNum; private Date lastTime; private Instant lastTime; private List<RecordItem> recordList; @@ -72,11 +72,11 @@ this.sn = sn; } public Date getLastTime() { public Instant getLastTime() { return lastTime; } public void setLastTime(Date lastTime) { public void setLastTime(Instant lastTime) { this.lastTime = lastTime; } } src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java
@@ -5,7 +5,8 @@ import org.jetbrains.annotations.NotNull; import java.text.ParseException; import java.util.Date; import java.time.Instant; import java.time.temporal.TemporalAccessor; /** * @description:设备录像bean @@ -116,17 +117,17 @@ @Override public int compareTo(@NotNull RecordItem recordItem) { try { Date startTime_now = DateUtil.format.parse(startTime); Date startTime_param = DateUtil.format.parse(recordItem.getStartTime()); if (startTime_param.compareTo(startTime_now) > 0) { return -1; }else { return 1; } } catch (ParseException e) { e.printStackTrace(); TemporalAccessor startTimeNow = DateUtil.formatter.parse(startTime); TemporalAccessor startTimeParam = DateUtil.formatter.parse(recordItem.getStartTime()); Instant startTimeParamInstant = Instant.from(startTimeParam); Instant startTimeNowInstant = Instant.from(startTimeNow); if (startTimeNowInstant.equals(startTimeParamInstant)) { return 0; }else if (Instant.from(startTimeParam).isAfter(Instant.from(startTimeNow)) ) { return -1; }else { return 1; } return 0; } } src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
@@ -9,11 +9,14 @@ import javax.sip.*; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.util.Calendar; import java.util.Date; import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author lin */ @Component public class SipSubscribe { @@ -23,28 +26,25 @@ private Map<String, SipSubscribe.Event> okSubscribes = new ConcurrentHashMap<>(); private Map<String, Date> okTimeSubscribes = new ConcurrentHashMap<>(); private Map<String, Date> errorTimeSubscribes = new ConcurrentHashMap<>(); private Map<String, Instant> okTimeSubscribes = new ConcurrentHashMap<>(); private Map<String, Instant> errorTimeSubscribes = new ConcurrentHashMap<>(); // @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次 // @Scheduled(fixedRate= 100 * 60 * 60 ) // @Scheduled(fixedRate= 100 * 60 * 60 ) @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 public void execute(){ logger.info("[定时任务] 清理过期的SIP订阅信息"); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) - 5); Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5)); for (String key : okTimeSubscribes.keySet()) { if (okTimeSubscribes.get(key).before(calendar.getTime())){ // logger.info("[定时任务] 清理过期的订阅信息: {}", key); if (okTimeSubscribes.get(key).isBefore(instant)){ okSubscribes.remove(key); okTimeSubscribes.remove(key); } } for (String key : errorTimeSubscribes.keySet()) { if (errorTimeSubscribes.get(key).before(calendar.getTime())){ // logger.info("[定时任务] 清理过期的订阅信息: {}", key); if (errorTimeSubscribes.get(key).isBefore(instant)){ errorSubscribes.remove(key); errorTimeSubscribes.remove(key); } @@ -117,12 +117,12 @@ public void addErrorSubscribe(String key, SipSubscribe.Event event) { errorSubscribes.put(key, event); errorTimeSubscribes.put(key, new Date()); errorTimeSubscribes.put(key, Instant.now()); } public void addOkSubscribe(String key, SipSubscribe.Event event) { okSubscribes.put(key, event); okTimeSubscribes.put(key, new Date()); okTimeSubscribes.put(key, Instant.now()); } public SipSubscribe.Event getErrorSubscribe(String key) { src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -4,24 +4,20 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.Instant; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @Component public class CatalogDataCatch { public static Map<String, CatalogData> data = new ConcurrentHashMap<>(); @Autowired private DeferredResultHolder deferredResultHolder; @Autowired private IVideoManagerStorage storager; @@ -34,7 +30,7 @@ catalogData.setDevice(device); catalogData.setSn(sn); catalogData.setStatus(CatalogData.CatalogDataStatus.ready); catalogData.setLastTime(new Date(System.currentTimeMillis())); catalogData.setLastTime(Instant.now()); data.put(device.getDeviceId(), catalogData); } } @@ -48,7 +44,7 @@ catalogData.setDevice(device); catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>())); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); catalogData.setLastTime(new Date(System.currentTimeMillis())); catalogData.setLastTime(Instant.now()); data.put(deviceId, catalogData); }else { // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 @@ -59,7 +55,7 @@ catalogData.setDevice(device); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); catalogData.getChannelList().addAll(deviceChannelList); catalogData.setLastTime(new Date(System.currentTimeMillis())); catalogData.setLastTime(Instant.now()); } } @@ -102,16 +98,13 @@ @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ Set<String> keys = data.keySet(); Calendar calendarBefore5S = Calendar.getInstance(); calendarBefore5S.setTime(new Date()); calendarBefore5S.set(Calendar.SECOND, calendarBefore5S.get(Calendar.SECOND) - 5); Calendar calendarBefore30S = Calendar.getInstance(); calendarBefore30S.setTime(new Date()); calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30); Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30)); for (String deviceId : keys) { CatalogData catalogData = data.get(deviceId); if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); if (catalogData.getTotal() != catalogData.getChannelList().size()) { @@ -124,7 +117,7 @@ } catalogData.setStatus(CatalogData.CatalogDataStatus.end); } if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除 data.remove(deviceId); } } src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java
@@ -3,16 +3,15 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @author lin @@ -35,7 +34,7 @@ recordInfo.setSn(sn.trim()); recordInfo.setSumNum(sumNum); recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>())); recordInfo.setLastTime(new Date(System.currentTimeMillis())); recordInfo.setLastTime(Instant.now()); recordInfo.getRecordList().addAll(recordItems); data.put(key, recordInfo); }else { @@ -44,7 +43,7 @@ return 0; } recordInfo.getRecordList().addAll(recordItems); recordInfo.setLastTime(new Date(System.currentTimeMillis())); recordInfo.setLastTime(Instant.now()); } return recordInfo.getRecordList().size(); } @@ -52,14 +51,12 @@ @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ Set<String> keys = data.keySet(); Calendar calendarBefore5S = Calendar.getInstance(); calendarBefore5S.setTime(new Date()); calendarBefore5S.set(Calendar.SECOND, calendarBefore5S.get(Calendar.SECOND) - 5); // 获取五秒前的时刻 Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5)); for (String key : keys) { RecordInfo recordInfo = data.get(key); // 超过五秒收不到消息任务超时, 只更新这一部分数据 if ( recordInfo.getLastTime().before(calendarBefore5S.getTime())) { if ( recordInfo.getLastTime().isBefore(instantBefore5S)) { // 处理录像数据, 返回给前端 String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn(); src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
@@ -40,7 +40,7 @@ import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; import java.util.Date; import java.time.Instant; import java.util.Vector; /** @@ -180,16 +180,16 @@ Long startTime = null; Long stopTime = null; Date start = null; Date end = null; Instant start = null; Instant end = null; if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); TimeField startTimeFiled = (TimeField)timeDescription.getTime(); startTime = startTimeFiled.getStartTime(); stopTime = startTimeFiled.getStopTime(); start = new Date(startTime*1000); end = new Date(stopTime*1000); start = Instant.ofEpochMilli(startTime*1000); end = Instant.ofEpochMilli(stopTime*1000); } // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); @@ -335,8 +335,8 @@ sendRtpItem.setStreamId(ssrcInfo.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.format.format(start), DateUtil.format.format(end), null, result -> { playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), DateUtil.formatter.format(end), null, result -> { if (result.getCode() != 0){ logger.warn("录像回放失败"); if (result.getEvent() != null) { src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
@@ -24,8 +24,6 @@ import javax.sip.header.ViaHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.Calendar; import java.util.Date; @Component public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
@@ -11,24 +11,18 @@ import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import javax.sip.DialogState; import javax.sip.TimeoutEvent; import java.text.ParseException; import java.util.Calendar; import java.util.Date; import java.time.Instant; import java.util.List; import java.util.concurrent.TimeUnit; /** * 设备业务(目录订阅) @@ -101,9 +95,7 @@ // 刷新过期任务 String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId(); dynamicTask.stop(registerExpireTaskKey); dynamicTask.startDelay(registerExpireTaskKey, ()->{ offline(device.getDeviceId()); }, device.getExpires() * 1000); dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000); } @Override @@ -217,18 +209,9 @@ @Override public boolean expire(Device device) { Date registerTimeDate; try { registerTimeDate = DateUtil.format.parse(device.getRegisterTime()); } catch (ParseException e) { logger.error("设备时间格式化失败:{}->{} ", device.getDeviceId(), device.getRegisterTime() ); return false; } int expires = device.getExpires(); Calendar calendarForExpire = Calendar.getInstance(); calendarForExpire.setTime(registerTimeDate); calendarForExpire.set(Calendar.SECOND, calendarForExpire.get(Calendar.SECOND) + expires); return calendarForExpire.before(DateUtil.getNow()); Instant registerTimeDate = Instant.from(DateUtil.formatter.parse(device.getRegisterTime())); Instant expireInstant = registerTimeDate.plusMillis(TimeUnit.SECONDS.toMillis(device.getExpires())); return expireInstant.isBefore(Instant.now()); } @Override