|  |  |  | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | import org.slf4j.LoggerFactory; | 
|---|
|  |  |  | import org.springframework.beans.factory.annotation.Autowired; | 
|---|
|  |  |  | import org.springframework.jdbc.datasource.DataSourceTransactionManager; | 
|---|
|  |  |  | import org.springframework.stereotype.Service; | 
|---|
|  |  |  | import org.springframework.transaction.TransactionDefinition; | 
|---|
|  |  |  | import org.springframework.transaction.TransactionStatus; | 
|---|
|  |  |  | import org.springframework.util.StringUtils; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import java.util.*; | 
|---|
|  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private IMediaServerService mediaServerService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | DataSourceTransactionManager dataSourceTransactionManager; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | TransactionDefinition transactionDefinition; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) { | 
|---|
|  |  |  | 
|---|
|  |  |  | stream.setStatus(true); | 
|---|
|  |  |  | stream.setCreateTime(DateUtil.getNow()); | 
|---|
|  |  |  | int add = gbStreamMapper.add(stream); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // 查找开启了全部直播流共享的上级平台 | 
|---|
|  |  |  | List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); | 
|---|
|  |  |  | if (parentPlatforms.size() > 0) { | 
|---|
|  |  |  | for (ParentPlatform parentPlatform : parentPlatforms) { | 
|---|
|  |  |  | stream.setCatalogId(parentPlatform.getCatalogId()); | 
|---|
|  |  |  | stream.setPlatformId(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | String streamId = stream.getStream(); | 
|---|
|  |  |  | StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); | 
|---|
|  |  |  | if (streamProxyItem == null) { | 
|---|
|  |  |  | platformGbStreamMapper.add(stream); | 
|---|
|  |  |  | eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | if (!streamProxyItem.getGbId().equals(stream.getGbId())) { | 
|---|
|  |  |  | // 此流使用另一个国标Id已经与该平台关联,移除此记录 | 
|---|
|  |  |  | platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); | 
|---|
|  |  |  | platformGbStreamMapper.add(stream); | 
|---|
|  |  |  | eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | return add > 0; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | 
|---|
|  |  |  | public void batchAdd(List<StreamPushItem> streamPushItems) { | 
|---|
|  |  |  | streamPushMapper.addAll(streamPushItems); | 
|---|
|  |  |  | gbStreamMapper.batchAdd(streamPushItems); | 
|---|
|  |  |  | // 查找开启了全部直播流共享的上级平台 | 
|---|
|  |  |  | List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); | 
|---|
|  |  |  | if (parentPlatforms.size() > 0) { | 
|---|
|  |  |  | for (StreamPushItem stream : streamPushItems) { | 
|---|
|  |  |  | for (ParentPlatform parentPlatform : parentPlatforms) { | 
|---|
|  |  |  | stream.setCatalogId(parentPlatform.getCatalogId()); | 
|---|
|  |  |  | stream.setPlatformId(parentPlatform.getServerGBId()); | 
|---|
|  |  |  | String streamId = stream.getStream(); | 
|---|
|  |  |  | StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); | 
|---|
|  |  |  | if (streamProxyItem == null) { | 
|---|
|  |  |  | platformGbStreamMapper.add(stream); | 
|---|
|  |  |  | eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); | 
|---|
|  |  |  | }else { | 
|---|
|  |  |  | if (!streamProxyItem.getGbId().equals(stream.getGbId())) { | 
|---|
|  |  |  | // 此流使用另一个国标Id已经与该平台关联,移除此记录 | 
|---|
|  |  |  | platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); | 
|---|
|  |  |  | platformGbStreamMapper.add(stream); | 
|---|
|  |  |  | eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); | 
|---|
|  |  |  | stream.setGbId(streamProxyItem.getGbId()); | 
|---|
|  |  |  | eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | 
|---|
|  |  |  | if (onlinePushers.size() == 0) { | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | streamPushMapper.allStreamOffline(); | 
|---|
|  |  |  | streamPushMapper.setAllStreamOffline(); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // 发送通知 | 
|---|
|  |  |  | eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); | 
|---|
|  |  |  | 
|---|
|  |  |  | // 发送通知 | 
|---|
|  |  |  | eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public boolean add(StreamPushItem stream) { | 
|---|
|  |  |  | stream.setUpdateTime(DateUtil.getNow()); | 
|---|
|  |  |  | stream.setCreateTime(DateUtil.getNow()); | 
|---|
|  |  |  | stream.setServerId(userSetting.getServerId()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | // 放在事务内执行 | 
|---|
|  |  |  | boolean result = false; | 
|---|
|  |  |  | TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | int addStreamResult = streamPushMapper.add(stream); | 
|---|
|  |  |  | if (!StringUtils.isEmpty(stream.getGbId())) { | 
|---|
|  |  |  | stream.setStreamType("push"); | 
|---|
|  |  |  | gbStreamMapper.add(stream); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | dataSourceTransactionManager.commit(transactionStatus); | 
|---|
|  |  |  | result = true; | 
|---|
|  |  |  | }catch (Exception e) { | 
|---|
|  |  |  | logger.error("批量移除流与平台的关系时错误", e); | 
|---|
|  |  |  | dataSourceTransactionManager.rollback(transactionStatus); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return result; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|