From 764d04b497356ba6bcbb75fd42b51eca750f7223 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: 星期三, 29 五月 2024 15:02:51 +0800 Subject: [PATCH] 调整上级观看消息的发送 --- src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java | 218 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 196 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java old mode 100644 new mode 100755 index 229bc0d..75cfb12 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -1,20 +1,30 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.common.StreamInfo; +import com.alibaba.fastjson2.JSONObject; +import com.baomidou.dynamic.datasource.annotation.DS; +import com.genersoft.iot.vmp.common.InviteInfo; +import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.service.IInviteStreamService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper; +import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo; +import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.util.ObjectUtils; import java.util.ArrayList; import java.util.HashMap; @@ -25,12 +35,16 @@ * @author lin */ @Service +@DS("master") public class DeviceChannelServiceImpl implements IDeviceChannelService { private final static Logger logger = LoggerFactory.getLogger(DeviceChannelServiceImpl.class); @Autowired - private IRedisCatchStorage redisCatchStorage; + private EventPublisher eventPublisher; + + @Autowired + private IInviteStreamService inviteStreamService; @Autowired private DeviceChannelMapper channelMapper; @@ -38,14 +52,21 @@ @Autowired private DeviceMapper deviceMapper; + @Autowired + private DeviceMobilePositionMapper deviceMobilePositionMapper; + + @Autowired + private UserSetting userSetting; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Override public DeviceChannel updateGps(DeviceChannel deviceChannel, Device device) { if (deviceChannel.getLongitude()*deviceChannel.getLatitude() > 0) { if (device == null) { device = deviceMapper.getDeviceByDeviceId(deviceChannel.getDeviceId()); } - - if ("WGS84".equals(device.getGeoCoordSys())) { deviceChannel.setLongitudeWgs84(deviceChannel.getLongitude()); @@ -78,9 +99,9 @@ public void updateChannel(String deviceId, DeviceChannel channel) { String channelId = channel.getChannelId(); channel.setDeviceId(deviceId); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - if (streamInfo != null) { - channel.setStreamId(streamInfo.getStream()); + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channelId); + if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { + channel.setStreamId(inviteInfo.getStreamInfo().getStream()); } String now = DateUtil.getNow(); channel.setUpdateTime(now); @@ -106,9 +127,9 @@ if (channelList.size() == 0) { for (DeviceChannel channel : channels) { channel.setDeviceId(deviceId); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId()); - if (streamInfo != null) { - channel.setStreamId(streamInfo.getStream()); + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channel.getChannelId()); + if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { + channel.setStreamId(inviteInfo.getStreamInfo().getStream()); } String now = DateUtil.getNow(); channel.setUpdateTime(now); @@ -122,9 +143,9 @@ } for (DeviceChannel channel : channels) { channel.setDeviceId(deviceId); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId()); - if (streamInfo != null) { - channel.setStreamId(streamInfo.getStream()); + InviteInfo inviteInfo = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, deviceId, channel.getChannelId()); + if (inviteInfo != null && inviteInfo.getStreamInfo() != null) { + channel.setStreamId(inviteInfo.getStreamInfo().getStream()); } String now = DateUtil.getNow(); channel.setUpdateTime(now); @@ -137,7 +158,7 @@ } } } - int limitCount = 300; + int limitCount = 50; if (addChannels.size() > 0) { if (addChannels.size() > limitCount) { for (int i = 0; i < addChannels.size(); i += limitCount) { @@ -169,8 +190,12 @@ } @Override - public ResourceBaceInfo getOverview() { - return channelMapper.getOverview(); + public ResourceBaseInfo getOverview() { + + int online = channelMapper.getOnlineCount(); + int total = channelMapper.getAllChannelCount(); + + return new ResourceBaseInfo(total, online); } @@ -191,7 +216,7 @@ deviceChannel.setUpdateTime(now); result.add(updateGps(deviceChannel, device)); }); - int limitCount = 300; + int limitCount = 50; if (result.size() > limitCount) { for (int i = 0; i < result.size(); i += limitCount) { int toIndex = i + limitCount; @@ -224,8 +249,24 @@ } @Override + public void online(DeviceChannel channel) { + channelMapper.online(channel.getDeviceId(), channel.getChannelId()); + } + + @Override public int channelsOffline(List<DeviceChannel> channels) { return channelMapper.batchOffline(channels); + } + + + @Override + public void offline(DeviceChannel channel) { + channelMapper.offline(channel.getDeviceId(), channel.getChannelId()); + } + + @Override + public void delete(DeviceChannel channel) { + channelMapper.del(channel.getDeviceId(), channel.getChannelId()); } @Override @@ -234,11 +275,23 @@ } @Override - public void batchUpdateChannel(List<DeviceChannel> channels) { - channelMapper.batchUpdate(channels); + public synchronized void batchUpdateChannel(List<DeviceChannel> channels) { + String now = DateUtil.getNow(); for (DeviceChannel channel : channels) { - if (channel.getParentId() != null) { - channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); + channel.setUpdateTime(now); + } + int limitCount = 1000; + if (!channels.isEmpty()) { + if (channels.size() > limitCount) { + for (int i = 0; i < channels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > channels.size()) { + toIndex = channels.size(); + } + channelMapper.batchUpdate(channels.subList(i, toIndex)); + } + }else { + channelMapper.batchUpdate(channels); } } } @@ -252,4 +305,125 @@ } } } + + @Override + public void updateChannelStreamIdentification(DeviceChannel channel) { + assert !ObjectUtils.isEmpty(channel.getDeviceId()); + assert !ObjectUtils.isEmpty(channel.getStreamIdentification()); + if (ObjectUtils.isEmpty(channel.getStreamIdentification())) { + logger.info("[閲嶇疆閫氶亾鐮佹祦绫诲瀷] 璁惧: {}, 鐮佹祦锛� {}", channel.getDeviceId(), channel.getStreamIdentification()); + }else { + logger.info("[鏇存柊閫氶亾鐮佹祦绫诲瀷] 璁惧: {}, 閫氶亾锛歿}锛� 鐮佹祦锛� {}", channel.getDeviceId(), channel.getChannelId(), + channel.getStreamIdentification()); + } + channelMapper.updateChannelStreamIdentification(channel); + } + + @Override + public List<DeviceChannel> queryChaneListByDeviceId(String deviceId) { + return channelMapper.queryAllChannels(deviceId); + } + + @Override + public void updateChannelGPS(Device device, DeviceChannel deviceChannel, MobilePosition mobilePosition) { + if (userSetting.getSavePositionHistory()) { + deviceMobilePositionMapper.insertNewPosition(mobilePosition); + } + + if (deviceChannel.getChannelId().equals(deviceChannel.getDeviceId())) { + deviceChannel.setChannelId(null); + } + if (deviceChannel.getGpsTime() == null) { + deviceChannel.setGpsTime(DateUtil.getNow()); + } + + int updated = channelMapper.updatePosition(deviceChannel); + if (updated == 0) { + return; + } + + List<DeviceChannel> deviceChannels = new ArrayList<>(); + if (deviceChannel.getChannelId() == null) { + // 鏈夌殑璁惧杩欓噷涓婃姤鐨刣eviceId涓庨�氶亾Id鏄竴鏍凤紝杩欑鎯呭喌鏇存柊璁惧涓嬬殑鍏ㄩ儴閫氶亾 + List<DeviceChannel> deviceChannelsInDb = queryChaneListByDeviceId(device.getDeviceId()); + deviceChannels.addAll(deviceChannelsInDb); + }else { + deviceChannels.add(deviceChannel); + } + if (deviceChannels.isEmpty()) { + return; + } + if (deviceChannels.size() > 100) { + logger.warn("[鏇存柊閫氶亾浣嶇疆淇℃伅鍚庡彂閫侀�氱煡] 璁惧鍙兘鏄钩鍙帮紝涓婃姤鐨勪綅缃俊鎭湭鏍囨槑閫氶亾缂栧彿锛�" + + "瀵艰嚧鎵�鏈夐�氶亾琚洿鏂颁綅缃紝 deviceId:{}", device.getDeviceId()); + } + for (DeviceChannel channel : deviceChannels) { + // 鍚戝叧鑱斾簡璇ラ�氶亾骞朵笖寮�鍚Щ鍔ㄤ綅缃闃呯殑涓婄骇骞冲彴鍙戦�佺Щ鍔ㄤ綅缃闃呮秷鎭� + mobilePosition.setChannelId(channel.getChannelId()); + try { + eventPublisher.mobilePositionEventPublish(mobilePosition); + }catch (Exception e) { + logger.error("[鍚戜笂绾ц浆鍙戠Щ鍔ㄤ綅缃け璐 ", e); + } + // 鍙戦�乺edis娑堟伅銆� 閫氱煡浣嶇疆淇℃伅鐨勫彉鍖� + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime())); + jsonObject.put("serial", mobilePosition.getDeviceId()); + jsonObject.put("code", mobilePosition.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); + } + } + + @Override + public void stopPlay(String deviceId, String channelId) { + channelMapper.stopPlay(deviceId, channelId); + } + + @Override + @Transactional + public void batchUpdateChannelGPS(List<DeviceChannel> channelList) { + for (DeviceChannel deviceChannel : channelList) { + deviceChannel.setUpdateTime(DateUtil.getNow()); + if (deviceChannel.getGpsTime() == null) { + deviceChannel.setGpsTime(DateUtil.getNow()); + } + } + int count = 1000; + if (channelList.size() > count) { + for (int i = 0; i < channelList.size(); i+=count) { + int toIndex = i+count; + if ( i + count > channelList.size()) { + toIndex = channelList.size(); + } + List<DeviceChannel> channels = channelList.subList(i, toIndex); + channelMapper.batchUpdatePosition(channels); + } + }else { + channelMapper.batchUpdatePosition(channelList); + } + } + + @Override + @Transactional + public void batchAddMobilePosition(List<MobilePosition> mobilePositions) { +// int count = 500; +// if (mobilePositions.size() > count) { +// for (int i = 0; i < mobilePositions.size(); i+=count) { +// int toIndex = i+count; +// if ( i + count > mobilePositions.size()) { +// toIndex = mobilePositions.size(); +// } +// List<MobilePosition> mobilePositionsSub = mobilePositions.subList(i, toIndex); +// deviceMobilePositionMapper.batchadd(mobilePositionsSub); +// } +// }else { +// deviceMobilePositionMapper.batchadd(mobilePositions); +// } + deviceMobilePositionMapper.batchadd(mobilePositions); + } } -- Gitblit v1.8.0