From 97b673d6ad6c2266a6e26eddf38dae23c00bb855 Mon Sep 17 00:00:00 2001 From: QingObject <1120359293@qq.com> Date: 星期五, 28 四月 2023 10:10:06 +0800 Subject: [PATCH] 新增JT1078 Template支持 --- src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java | 32 + src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java | 43 + src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java | 72 ++ src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java | 105 +++ src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java | 110 +++ src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java | 30 src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java | 56 + src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java | 50 + src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java | 112 +++ src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java | 44 + src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java | 27 src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java | 41 + src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java | 151 ++++ src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java | 27 src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java | 85 ++ src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java | 41 + src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java | 56 + src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java | 32 + src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java | 36 + src/main/resources/all-application.yml | 9 src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java | 46 + src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java | 146 ++++ src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java | 127 +++ src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java | 15 src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java | 76 ++ src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java | 114 +++ src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java | 33 + src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java | 112 +++ src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java | 40 + 29 files changed, 1,868 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java b/src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java new file mode 100644 index 0000000..d5c2de4 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java @@ -0,0 +1,15 @@ +package com.genersoft.iot.vmp.jt1078.annotation; + +import java.lang.annotation.*; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:31 + * @email qingtaij@163.com + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface MsgId { + String id(); +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java b/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java new file mode 100644 index 0000000..ad3ab00 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java @@ -0,0 +1,56 @@ +package com.genersoft.iot.vmp.jt1078.cmd; + +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; +import com.genersoft.iot.vmp.jt1078.proc.response.J9101; +import com.genersoft.iot.vmp.jt1078.proc.response.J9102; +import com.genersoft.iot.vmp.jt1078.session.SessionManager; + +import java.util.Random; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:58 + * @email qingtaij@163.com + */ +public class JT1078Template { + + private final Random random = new Random(); + + /** + * 寮�鍚洿鎾棰� + * + * @param devId 璁惧鍙� + * @param j9101 寮�鍚棰戝弬鏁� + */ + public String startLive(String devId, J9101 j9101, Integer timeOut) { + Cmd cmd = new Cmd.Builder() + .setDevId(devId) + .setPackageNo(randomInt()) + .setMsgId("9101") + .setRespId("0001") + .setRs(j9101) + .build(); + return SessionManager.INSTANCE.request(cmd, timeOut); + } + + /** + * 鍏抽棴鐩存挱瑙嗛 + * + * @param devId 璁惧鍙� + * @param j9102 鍏抽棴瑙嗛鍙傛暟 + */ + public String stopLive(String devId, J9102 j9102, Integer timeOut) { + Cmd cmd = new Cmd.Builder() + .setDevId(devId) + .setPackageNo(randomInt()) + .setMsgId("9102") + .setRespId("0001") + .setRs(j9102) + .build(); + return SessionManager.INSTANCE.request(cmd, timeOut); + } + + private Long randomInt() { + return (long) random.nextInt(1000) + 1; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java new file mode 100644 index 0000000..4817c66 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java @@ -0,0 +1,146 @@ +package com.genersoft.iot.vmp.jt1078.codec.decode; + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory; +import com.genersoft.iot.vmp.jt1078.proc.request.Re; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:10 + * @email qingtaij@163.com + */ +public class Jt808Decoder extends ByteToMessageDecoder { + private final static Logger log = LoggerFactory.getLogger(Jt808Decoder.class); + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + Session session = ctx.channel().attr(Session.KEY).get(); + log.info("> {} hex:{}", session, ByteBufUtil.hexDump(in)); + + try { + ByteBuf buf = unEscapeAndCheck(in); + + Header header = new Header(); + header.setMsgId(ByteBufUtil.hexDump(buf.readSlice(2))); + header.setMsgPro(buf.readUnsignedShort()); + if (header.is2019Version()) { + header.setVersion(buf.readUnsignedByte()); + String devId = ByteBufUtil.hexDump(buf.readSlice(10)); + header.setDevId(devId.replaceFirst("^0*", "")); + } else { + header.setDevId(ByteBufUtil.hexDump(buf.readSlice(6)).replaceFirst("^0*", "")); + } + header.setSn(buf.readUnsignedShort()); + + Re handler = CodecFactory.getHandler(header.getMsgId()); + if (handler == null) { + log.error("get msgId is null {}", header.getMsgId()); + return; + } + Rs decode = handler.decode(buf, header, session); + if (decode != null) { + out.add(decode); + } + } finally { + in.skipBytes(in.readableBytes()); + } + + + } + + + /** + * 杞箟涓庨獙璇佹牎楠岀爜 + * + * @param byteBuf 杞箟Buf + * @return 杞箟濂界殑鏁版嵁 + */ + public ByteBuf unEscapeAndCheck(ByteBuf byteBuf) throws Exception { + int low = byteBuf.readerIndex(); + int high = byteBuf.writerIndex(); + byte checkSum = 0; + int calculationCheckSum = 0; + + byte aByte = byteBuf.getByte(high - 2); + byte protocolEscapeFlag7d = 0x7d; + //0x7d杞箟 + byte protocolEscapeFlag01 = 0x01; + //0x7e杞箟 + byte protocolEscapeFlag02 = 0x02; + if (aByte == protocolEscapeFlag7d) { + byte b2 = byteBuf.getByte(high - 1); + if (b2 == protocolEscapeFlag01) { + checkSum = protocolEscapeFlag7d; + } else if (b2 == protocolEscapeFlag02) { + checkSum = 0x7e; + } else { + log.error("杞箟1寮傚父:{}", ByteBufUtil.hexDump(byteBuf)); + throw new Exception("杞箟閿欒"); + } + high = high - 2; + } else { + high = high - 1; + checkSum = byteBuf.getByte(high); + } + List<ByteBuf> bufList = new ArrayList<>(); + int index = low; + while (index < high) { + byte b = byteBuf.getByte(index); + if (b == protocolEscapeFlag7d) { + byte c = byteBuf.getByte(index + 1); + if (c == protocolEscapeFlag01) { + ByteBuf slice = slice0x01(byteBuf, low, index); + bufList.add(slice); + b = protocolEscapeFlag7d; + } else if (c == protocolEscapeFlag02) { + ByteBuf slice = slice0x02(byteBuf, low, index); + bufList.add(slice); + b = 0x7e; + } else { + log.error("杞箟2寮傚父:{}", ByteBufUtil.hexDump(byteBuf)); + throw new Exception("杞箟閿欒"); + } + index += 2; + low = index; + } else { + index += 1; + } + calculationCheckSum = calculationCheckSum ^ b; + } + + if (calculationCheckSum == checkSum) { + if (bufList.size() == 0) { + return byteBuf.slice(low, high); + } else { + bufList.add(byteBuf.slice(low, high - low)); + return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, bufList.size(), bufList); + } + } else { + log.info("{} 瑙f瀽鏍¢獙鐮�:{}--璁$畻鏍¢獙鐮�:{}", ByteBufUtil.hexDump(byteBuf), checkSum, calculationCheckSum); + throw new Exception("鏍¢獙鐮侀敊璇�!"); + } + } + + + private ByteBuf slice0x01(ByteBuf buf, int low, int sign) { + return buf.slice(low, sign - low + 1); + } + + private ByteBuf slice0x02(ByteBuf buf, int low, int sign) { + buf.setByte(sign, 0x7e); + return buf.slice(low, sign - low + 1); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java new file mode 100644 index 0000000..afb1a79 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java @@ -0,0 +1,33 @@ +package com.genersoft.iot.vmp.jt1078.codec.encode; + + +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:10 + * @email qingtaij@163.com + */ +public class Jt808Encoder extends MessageToByteEncoder<Rs> { + private final static Logger log = LoggerFactory.getLogger(Jt808Encoder.class); + + @Override + protected void encode(ChannelHandlerContext ctx, Rs msg, ByteBuf out) throws Exception { + Session session = ctx.channel().attr(Session.KEY).get(); + + ByteBuf encode = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo()); + if(encode!=null){ + log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode)); + out.writeBytes(encode); + } + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java new file mode 100644 index 0000000..0e9e11f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java @@ -0,0 +1,151 @@ +package com.genersoft.iot.vmp.jt1078.codec.encode; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import com.genersoft.iot.vmp.jt1078.util.Bin; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.ByteProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import java.util.LinkedList; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:25 + * @email qingtaij@163.com + */ +public class Jt808EncoderCmd extends MessageToByteEncoder<Cmd> { + private final static Logger log = LoggerFactory.getLogger(Jt808EncoderCmd.class); + + @Override + protected void encode(ChannelHandlerContext ctx, Cmd cmd, ByteBuf out) throws Exception { + Session session = ctx.channel().attr(Session.KEY).get(); + Rs msg = cmd.getRs(); + ByteBuf encode = encode(msg, session, cmd.getPackageNo().intValue()); + if (encode != null) { + log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode)); + out.writeBytes(encode); + } + } + + + public static ByteBuf encode(Rs msg, Session session, Integer packageNo) { + String id = msg.getClass().getAnnotation(MsgId.class).id(); + if (!StringUtils.hasLength(id)) { + log.error("Not find msgId"); + return null; + } + + ByteBuf byteBuf = Unpooled.buffer(); + + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(id)); + + ByteBuf encode = msg.encode(); + + Header header = msg.getHeader(); + if (header == null) { + header = session.getHeader(); + } + + if (header.is2019Version()) { + // 娑堟伅浣撳睘鎬� + byteBuf.writeShort(encode.readableBytes() | 1 << 14); + + // 鐗堟湰鍙� + byteBuf.writeByte(header.getVersion()); + + // 缁堢鎵嬫満鍙� + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 20))); + } else { + // 娑堟伅浣撳睘鎬� + byteBuf.writeShort(encode.readableBytes()); + + byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 12))); + } + + // 娑堟伅浣撴祦姘村彿 + byteBuf.writeShort(packageNo); + + // 鍐欏叆娑堟伅浣� + byteBuf.writeBytes(encode); + + // 璁$畻鏍¢獙鐮侊紝骞跺弽杞箟 + byteBuf = escapeAndCheck0(byteBuf); + return byteBuf; + } + + + private static final ByteProcessor searcher = value -> !(value == 0x7d || value == 0x7e); + + //杞箟涓庢牎楠� + public static ByteBuf escapeAndCheck0(ByteBuf source) { + + sign(source); + + int low = source.readerIndex(); + int high = source.writerIndex(); + + LinkedList<ByteBuf> bufList = new LinkedList<>(); + int mark, len; + while ((mark = source.forEachByte(low, high - low, searcher)) > 0) { + + len = mark + 1 - low; + ByteBuf[] slice = slice(source, low, len); + bufList.add(slice[0]); + bufList.add(slice[1]); + low += len; + } + + if (bufList.size() > 0) { + bufList.add(source.slice(low, high - low)); + } else { + bufList.add(source); + } + + ByteBuf delimiter = Unpooled.buffer(1, 1).writeByte(0x7e).retain(); + bufList.addFirst(delimiter); + bufList.addLast(delimiter); + + CompositeByteBuf byteBufLs = Unpooled.compositeBuffer(bufList.size()); + byteBufLs.addComponents(true, bufList); + return byteBufLs; + } + + public static void sign(ByteBuf buf) { + byte checkCode = bcc(buf); + buf.writeByte(checkCode); + } + + public static byte bcc(ByteBuf byteBuf) { + byte cs = 0; + while (byteBuf.isReadable()) + cs ^= byteBuf.readByte(); + byteBuf.resetReaderIndex(); + return cs; + } + + protected static ByteBuf[] slice(ByteBuf byteBuf, int index, int length) { + byte first = byteBuf.getByte(index + length - 1); + + ByteBuf[] byteBufList = new ByteBuf[2]; + byteBufList[0] = byteBuf.retainedSlice(index, length); + + if (first == 0x7d) { + byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x01); + } else { + byteBuf.setByte(index + length - 1, 0x7d); + byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x02); + } + return byteBufList; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java new file mode 100644 index 0000000..fd50302 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java @@ -0,0 +1,72 @@ +package com.genersoft.iot.vmp.jt1078.codec.netty; + +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import com.genersoft.iot.vmp.jt1078.session.SessionManager; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:14 + * @email qingtaij@163.com + */ +public class Jt808Handler extends ChannelInboundHandlerAdapter { + + private final static Logger log = LoggerFactory.getLogger(Jt808Handler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Rs) { + ctx.writeAndFlush(msg); + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + Session session = SessionManager.INSTANCE.newSession(channel); + channel.attr(Session.KEY).set(session); + log.info("> Tcp connect {}", session); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + Session session = ctx.channel().attr(Session.KEY).get(); + log.info("< Tcp disconnect {}", session); + ctx.close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) { + Session session = ctx.channel().attr(Session.KEY).get(); + String message = e.getMessage(); + if (message.toLowerCase().contains("Connection reset by peer".toLowerCase())) { + log.info("< exception{} {}", session, e.getMessage()); + } else { + log.info("< exception{} {}", session, e.getMessage(), e); + } + + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + IdleState state = event.state(); + if (state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) { + Session session = ctx.channel().attr(Session.KEY).get(); + log.warn("< Proactively disconnect{}", session); + ctx.close(); + } + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java new file mode 100644 index 0000000..a7e4df8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java @@ -0,0 +1,112 @@ +package com.genersoft.iot.vmp.jt1078.codec.netty; + +import com.genersoft.iot.vmp.jt1078.codec.decode.Jt808Decoder; +import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808Encoder; +import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808EncoderCmd; +import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioChannelOption; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:01 + * @email qingtaij@163.com + */ + +public class TcpServer { + private final static Logger log = LoggerFactory.getLogger(TcpServer.class); + + private final Integer port; + private boolean isRunning = false; + private EventLoopGroup bossGroup = null; + private EventLoopGroup workerGroup = null; + + private final ByteBuf DECODER_JT808 = Unpooled.wrappedBuffer(new byte[]{0x7e}); + + public TcpServer(Integer port) { + this.port = port; + } + + private void startTcpServer() { + try { + CodecFactory.init(); + this.bossGroup = new NioEventLoopGroup(); + this.workerGroup = new NioEventLoopGroup(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.channel(NioServerSocketChannel.class); + bootstrap.group(bossGroup, workerGroup); + + bootstrap.option(NioChannelOption.SO_BACKLOG, 1024) + .option(NioChannelOption.SO_REUSEADDR, true) + .childOption(NioChannelOption.TCP_NODELAY, true) + .childHandler(new ChannelInitializer<NioSocketChannel>() { + @Override + public void initChannel(NioSocketChannel channel) { + channel.pipeline() + .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES)) + .addLast(new DelimiterBasedFrameDecoder(1024 * 2, DECODER_JT808)) + .addLast(new Jt808Decoder()) + .addLast(new Jt808Encoder()) + .addLast(new Jt808EncoderCmd()) + .addLast(new Jt808Handler()); + } + }); + ChannelFuture channelFuture = bootstrap.bind(port).sync(); + // 鐩戝惉璁惧TCP绔彛鏄惁鍚姩鎴愬姛 + channelFuture.addListener(future -> { + if (!future.isSuccess()) { + log.error("Binding port:{} fail! cause: {}", port, future.cause().getCause(), future.cause()); + } + }); + log.info("鏈嶅姟:JT808 Server 鍚姩鎴愬姛, port:{}", port); + channelFuture.channel().closeFuture().sync(); + } catch (Exception e) { + log.warn("鏈嶅姟:JT808 Server 鍚姩寮傚父, port:{},{}", port, e.getMessage(), e); + } finally { + stop(); + } + } + + /** + * 寮�鍚竴涓柊鐨勭嚎绋�,鎷夎捣鏉etty + */ + public synchronized void start() { + if (this.isRunning) { + log.warn("鏈嶅姟:JT808 Server 宸茬粡鍚姩, port:{}", port); + return; + } + this.isRunning = true; + new Thread(this::startTcpServer).start(); + } + + public synchronized void stop() { + if (!this.isRunning) { + log.warn("鏈嶅姟:JT808 Server 宸茬粡鍋滄, port:{}", port); + } + this.isRunning = false; + Future<?> future = this.bossGroup.shutdownGracefully(); + if (!future.isSuccess()) { + log.warn("bossGroup 鏃犳硶姝e父鍋滄", future.cause()); + } + future = this.workerGroup.shutdownGracefully(); + if (!future.isSuccess()) { + log.warn("workerGroup 鏃犳硶姝e父鍋滄", future.cause()); + } + log.warn("鏈嶅姟:JT808 Server 宸茬粡鍋滄, port:{}", port); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java new file mode 100644 index 0000000..cffb147 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java @@ -0,0 +1,46 @@ +package com.genersoft.iot.vmp.jt1078.config; + +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; +import com.genersoft.iot.vmp.jt1078.proc.response.J9101; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * curl http://localhost:18080/api/jt1078/start/live/18864197066/1 + * + * @author QingtaiJiang + * @date 2023/4/27 18:12 + * @email qingtaij@163.com + */ +@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true") +@RestController +@RequestMapping("/api/jt1078") +public class JT1078Controller { + + @Resource + JT1078Template jt1078Template; + + @GetMapping("/start/live/{deviceId}/{channelId}") + public WVPResult<?> startLive(@PathVariable String deviceId, @PathVariable String channelId) { + J9101 j9101 = new J9101(); + j9101.setChannel(Integer.valueOf(channelId)); + j9101.setIp("192.168.1.1"); + j9101.setRate(1); + j9101.setTcpPort(7618); + j9101.setUdpPort(7618); + j9101.setType(0); + + String s = jt1078Template.startLive(deviceId, j9101, 6); + WVPResult<String> wvpResult = new WVPResult<>(); + wvpResult.setCode(200); + wvpResult.setData(String.format("http://192.168.1.1/rtp/%s_%s.live.mp4", deviceId, channelId)); + return wvpResult; + } +} + diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java b/src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java new file mode 100644 index 0000000..0b07bb4 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java @@ -0,0 +1,30 @@ +package com.genersoft.iot.vmp.jt1078.config; + +import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; +import com.genersoft.iot.vmp.jt1078.codec.netty.TcpServer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; + +/** + * @author QingtaiJiang + * @date 2023/4/27 19:35 + * @email qingtaij@163.com + */ +@Order(Integer.MIN_VALUE) +@Configuration +@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true") +public class TcpAutoConfiguration { + + @Bean(initMethod = "start", destroyMethod = "stop") + public TcpServer jt1078Server(@Value("${jt1078.port}") Integer port) { + return new TcpServer(port); + } + + @Bean + public JT1078Template jt1078Template() { + return new JT1078Template(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java new file mode 100644 index 0000000..86c5fff --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java @@ -0,0 +1,76 @@ +package com.genersoft.iot.vmp.jt1078.proc; + +import com.genersoft.iot.vmp.jt1078.util.Bin; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:22 + * @email qingtaij@163.com + */ +public class Header { + // 娑堟伅ID + String msgId; + + // 娑堟伅浣撳睘鎬� + Integer msgPro; + + // 鏍囪瘑 + String devId; + + // 娑堟伅浣撴祦姘村彿 + Integer sn; + + // 鍗忚鐗堟湰鍙� + Short version = -1; + + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public Integer getMsgPro() { + return msgPro; + } + + public void setMsgPro(Integer msgPro) { + this.msgPro = msgPro; + } + + public String getDevId() { + return devId; + } + + public void setDevId(String devId) { + this.devId = devId; + } + + public Integer getSn() { + return sn; + } + + public void setSn(Integer sn) { + this.sn = sn; + } + + public Short getVersion() { + return version; + } + + public void setVersion(Short version) { + this.version = version; + } + + /** + * 鍒ゆ柇鏄惁鏄�2019鐨勭増鏈� + * + * @return true 2019鍚庣殑鐗堟湰銆俧alse 2013 + */ + public boolean is2019Version() { + return Bin.get(msgPro, 14); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java new file mode 100644 index 0000000..19d6d8f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java @@ -0,0 +1,105 @@ +package com.genersoft.iot.vmp.jt1078.proc.entity; + +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:23 + * @email qingtaij@163.com + */ +public class Cmd { + String devId; + Long packageNo; + String msgId; + String respId; + Rs rs; + + public Cmd() { + } + + public Cmd(Builder builder) { + this.devId = builder.devId; + this.packageNo = builder.packageNo; + this.msgId = builder.msgId; + this.respId = builder.respId; + this.rs = builder.rs; + } + + public String getDevId() { + return devId; + } + + public void setDevId(String devId) { + this.devId = devId; + } + + public Long getPackageNo() { + return packageNo; + } + + public void setPackageNo(Long packageNo) { + this.packageNo = packageNo; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public String getRespId() { + return respId; + } + + public void setRespId(String respId) { + this.respId = respId; + } + + public Rs getRs() { + return rs; + } + + public void setRs(Rs rs) { + this.rs = rs; + } + + public static class Builder { + String devId; + Long packageNo; + String msgId; + String respId; + Rs rs; + + public Builder setDevId(String devId) { + this.devId = devId.replaceFirst("^0*", ""); + return this; + } + + public Builder setPackageNo(Long packageNo) { + this.packageNo = packageNo; + return this; + } + + public Builder setMsgId(String msgId) { + this.msgId = msgId; + return this; + } + + public Builder setRespId(String respId) { + this.respId = respId; + return this; + } + + public Builder setRs(Rs re) { + this.rs = re; + return this; + } + + public Cmd build() { + return new Cmd(this); + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java new file mode 100644 index 0000000..45d5fc7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.jt1078.proc.factory; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.request.Re; +import com.genersoft.iot.vmp.jt1078.util.ClassUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:29 + * @email qingtaij@163.com + */ + +public class CodecFactory { + private final static Logger log = LoggerFactory.getLogger(CodecFactory.class); + + private static Map<String, Class<?>> protocolHash; + + public static void init() { + protocolHash = new HashMap<>(); + List<Class<?>> classList = ClassUtil.getClassList("com.genersoft.iot.vmp.jt1078.proc", MsgId.class); + for (Class<?> handlerClass : classList) { + String id = handlerClass.getAnnotation(MsgId.class).id(); + protocolHash.put(id, handlerClass); + } + if (log.isDebugEnabled()) { + log.debug("娑堟伅ID缂撳瓨琛� protocolHash:{}", protocolHash); + } + } + + public static Re getHandler(String msgId) { + Class<?> aClass = protocolHash.get(msgId); + Object bean = ClassUtil.getBean(aClass); + if (bean instanceof Re) { + return (Re) bean; + } + return null; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java new file mode 100644 index 0000000..1d7f85d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java @@ -0,0 +1,50 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.alibaba.fastjson2.JSON; +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import com.genersoft.iot.vmp.jt1078.session.SessionManager; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; + +/** + * 缁堢閫氱敤搴旂瓟 + * + * @author QingtaiJiang + * @date 2023/4/27 18:04 + * @email qingtaij@163.com + */ +@MsgId(id = "0001") +public class J0001 extends Re { + int respNo; + String respId; + int result; + + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + respNo = buf.readUnsignedShort(); + respId = ByteBufUtil.hexDump(buf.readSlice(2)); + result = buf.readUnsignedByte(); + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + SessionManager.INSTANCE.response(header.getDevId(), "0001", (long) respNo, JSON.toJSONString(this)); + return null; + } + + public int getRespNo() { + return respNo; + } + + public String getRespId() { + return respId; + } + + public int getResult() { + return result; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java new file mode 100644 index 0000000..f52303a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java @@ -0,0 +1,32 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8001; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 缁堢蹇冭烦 + * + * @author QingtaiJiang + * @date 2023/4/27 18:04 + * @email qingtaij@163.com + */ +@MsgId(id = "0002") +public class J0002 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8001 j8001 = new J8001(); + j8001.setRespNo(header.getSn()); + j8001.setRespId(header.getMsgId()); + j8001.setResult(J8001.SUCCESS); + return j8001; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java new file mode 100644 index 0000000..0f00a80 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java @@ -0,0 +1,27 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 鏌ヨ鏈嶅姟鍣ㄦ椂闂� + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0004") +public class J0004 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + return null; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java new file mode 100644 index 0000000..a731dda --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java @@ -0,0 +1,56 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8100; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 缁堢娉ㄥ唽 + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0100") +public class J0100 extends Re { + + private int provinceId; + + private int cityId; + + private String makerId; + + private String deviceModel; + + private String deviceId; + + private int plateColor; + + private String plateNo; + + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + Short version = header.getVersion(); + provinceId = buf.readUnsignedShort(); + if (version > 1) { + cityId = buf.readUnsignedShort(); + // decode as 2019 + } else { + int i = buf.readUnsignedShort(); + // decode as 2013 + } + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8100 j8100 = new J8100(); + j8100.setRespNo(header.getSn()); + j8100.setResult(J8100.SUCCESS); + j8100.setCode("WVP_YYDS"); + return j8100; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java new file mode 100644 index 0000000..8e531ae --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java @@ -0,0 +1,36 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8001; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 缁堢閴存潈 + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0102") +public class J0102 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + int lenCode = buf.readUnsignedByte(); +// String code = buf.readCharSequence(lenCode, CharsetUtil.UTF_8).toString(); + // if 2019 to decode next + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8001 j8001 = new J8001(); + j8001.setRespNo(header.getSn()); + j8001.setRespId(header.getMsgId()); + j8001.setResult(J8001.SUCCESS); + return j8001; + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java new file mode 100644 index 0000000..d027dd2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java @@ -0,0 +1,32 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.J8001; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; + +/** + * 瀹炴椂娑堟伅涓婃姤 + * + * @author QingtaiJiang + * @date 2023/4/27 18:06 + * @email qingtaij@163.com + */ +@MsgId(id = "0200") +public class J0200 extends Re { + @Override + protected Rs decode0(ByteBuf buf, Header header, Session session) { + return null; + } + + @Override + protected Rs handler(Header header, Session session) { + J8001 j8001 = new J8001(); + j8001.setRespNo(header.getSn()); + j8001.setRespId(header.getMsgId()); + j8001.setResult(J8001.SUCCESS); + return j8001; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java new file mode 100644 index 0000000..0a24ad2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java @@ -0,0 +1,40 @@ +package com.genersoft.iot.vmp.jt1078.proc.request; + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import com.genersoft.iot.vmp.jt1078.proc.response.Rs; +import com.genersoft.iot.vmp.jt1078.session.Session; +import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:50 + * @email qingtaij@163.com + */ +public abstract class Re { + private final static Logger log = LoggerFactory.getLogger(Re.class); + + protected abstract Rs decode0(ByteBuf buf, Header header, Session session); + + protected abstract Rs handler(Header header, Session session); + + public Rs decode(ByteBuf buf, Header header, Session session) { + if (session != null && !StringUtils.hasLength(session.getDevId())) { + session.register(header.getDevId(), (int) header.getVersion(), header); + } + Rs rs = decode0(buf, header, session); + Rs rsHand = handler(header, session); + if (rs == null && rsHand != null) { + rs = rsHand; + } else if (rs != null && rsHand != null) { + log.warn("decode0:{} 涓� handler:{} 杩斿洖鍊煎啿绐�,閲囩敤decode0杩斿洖鍊�", rs, rsHand); + } + if (rs != null) { + rs.setHeader(header); + } + + return rs; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java new file mode 100644 index 0000000..ec9e31f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:48 + * @email qingtaij@163.com + */ +@MsgId(id = "8001") +public class J8001 extends Rs { + public static final Integer SUCCESS = 0; + + Integer respNo; + String respId; + Integer result; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeShort(respNo); + buffer.writeBytes(ByteBufUtil.decodeHexDump(respId)); + buffer.writeByte(result); + + return buffer; + } + + + public void setRespNo(Integer respNo) { + this.respNo = respNo; + } + + public void setRespId(String respId) { + this.respId = respId; + } + + public void setResult(Integer result) { + this.result = result; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java new file mode 100644 index 0000000..48a9c95 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:40 + * @email qingtaij@163.com + */ +@MsgId(id = "8100") +public class J8100 extends Rs { + public static final Integer SUCCESS = 0; + + Integer respNo; + Integer result; + String code; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeShort(respNo); + buffer.writeByte(result); + buffer.writeCharSequence(code, CharsetUtil.UTF_8); + return buffer; + } + + public void setRespNo(Integer respNo) { + this.respNo = respNo; + } + + public void setResult(Integer result) { + this.result = result; + } + + public void setCode(String code) { + this.code = code; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java new file mode 100644 index 0000000..d671372 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java @@ -0,0 +1,110 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import com.genersoft.iot.vmp.jt1078.annotation.MsgId; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:25 + * @email qingtaij@163.com + */ +@MsgId(id = "9101") +public class J9101 extends Rs { + String ip; + + // TCP绔彛 + Integer tcpPort; + + // UDP绔彛 + Integer udpPort; + + // 閫昏緫閫氶亾鍙� + Integer channel; + + // 鏁版嵁绫诲瀷 + /** + * 0锛氶煶瑙嗛锛�1锛氳棰戯紝2锛氬弻鍚戝璁诧紝3锛氱洃鍚紝4锛氫腑蹇冨箍鎾紝5锛氶�忎紶 + */ + Integer type; + + // 鐮佹祦绫诲瀷 + /** + * 0锛氫富鐮佹祦锛�1锛氬瓙鐮佹祦 + */ + Integer rate; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeByte(ip.getBytes().length); + buffer.writeCharSequence(ip, CharsetUtil.UTF_8); + buffer.writeShort(tcpPort); + buffer.writeShort(udpPort); + buffer.writeByte(channel); + buffer.writeByte(type); + buffer.writeByte(rate); + return buffer; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public Integer getTcpPort() { + return tcpPort; + } + + public void setTcpPort(Integer tcpPort) { + this.tcpPort = tcpPort; + } + + public Integer getUdpPort() { + return udpPort; + } + + public void setUdpPort(Integer udpPort) { + this.udpPort = udpPort; + } + + public Integer getChannel() { + return channel; + } + + public void setChannel(Integer channel) { + this.channel = channel; + } + + public Integer getType() { + return type; + } + + public void setType(Integer type) { + this.type = type; + } + + public Integer getRate() { + return rate; + } + + public void setRate(Integer rate) { + this.rate = rate; + } + + @Override + public String toString() { + return "J9101{" + + "ip='" + ip + '\'' + + ", tcpPort=" + tcpPort + + ", udpPort=" + udpPort + + ", channel=" + channel + + ", type=" + type + + ", rate=" + rate + + '}'; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java new file mode 100644 index 0000000..f92fe8e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java @@ -0,0 +1,85 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:49 + * @email qingtaij@163.com + */ +public class J9102 extends Rs { + + // 閫氶亾鍙� + Integer channel; + + // 鎺у埗鎸囦护 + /** + * 0锛氬叧闂煶瑙嗛浼犺緭鎸囦护锛� + * 1锛氬垏鎹㈢爜娴�(澧炲姞鏆傚仠鍜岀户缁�)锛� + * 2锛氭殏鍋滆閫氶亾鎵�鏈夋祦鐨勫彂閫侊紱 + * 3锛氭仮澶嶆殏鍋滃墠娴佺殑鍙戦�侊紝涓庢殏鍋滃墠鐨勬祦绫诲瀷涓�鑷达紱 + * 4锛氬叧闂弻鍚戝璁� + */ + Integer command; + + // 鏁版嵁绫诲瀷 + /** + * 0锛氬叧闂閫氶亾鏈夊叧鐨勯煶瑙嗛鏁版嵁锛� + * 1锛氬彧鍏抽棴璇ラ�氶亾鏈夊叧鐨勯煶棰戯紝淇濈暀璇ラ�氶亾 + * 鏈夊叧鐨勮棰戯紱 + * 2锛氬彧鍏抽棴璇ラ�氶亾鏈夊叧鐨勮棰戯紝淇濈暀璇ラ�氶亾 + * 鏈夊叧鐨勯煶棰� + */ + Integer closeType; + + // 鏁版嵁绫诲瀷 + /** + * 0锛氫富鐮佹祦锛� + * 1锛氬瓙鐮佹祦 + */ + Integer streamType; + + @Override + public ByteBuf encode() { + ByteBuf buffer = Unpooled.buffer(); + buffer.writeByte(channel); + buffer.writeByte(command); + buffer.writeByte(closeType); + buffer.writeByte(streamType); + return null; + } + + + public Integer getChannel() { + return channel; + } + + public void setChannel(Integer channel) { + this.channel = channel; + } + + public Integer getCommand() { + return command; + } + + public void setCommand(Integer command) { + this.command = command; + } + + public Integer getCloseType() { + return closeType; + } + + public void setCloseType(Integer closeType) { + this.closeType = closeType; + } + + public Integer getStreamType() { + return streamType; + } + + public void setStreamType(Integer streamType) { + this.streamType = streamType; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java new file mode 100644 index 0000000..243cd94 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java @@ -0,0 +1,27 @@ +package com.genersoft.iot.vmp.jt1078.proc.response; + + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import io.netty.buffer.ByteBuf; + + +/** + * @author QingtaiJiang + * @date 2021/8/30 18:54 + * @email qingtaij@163.com + */ + +public abstract class Rs { + private Header header; + + public abstract ByteBuf encode(); + + + public Header getHeader() { + return header; + } + + public void setHeader(Header header) { + this.header = header; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java new file mode 100644 index 0000000..f7df8de --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java @@ -0,0 +1,114 @@ +package com.genersoft.iot.vmp.jt1078.session; + +import com.genersoft.iot.vmp.jt1078.proc.Header; +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author QingtaiJiang + * @date 2023/4/27 18:54 + * @email qingtaij@163.com + */ +public class Session { + private final static Logger log = LoggerFactory.getLogger(Session.class); + + public static final AttributeKey<Session> KEY = AttributeKey.newInstance(Session.class.getName()); + + // Netty鐨刢hannel + protected final Channel channel; + + // 鍘熷瓙绫荤殑鑷ID + private final AtomicInteger serialNo = new AtomicInteger(0); + + // 鏄惁娉ㄥ唽鎴愬姛 + private boolean registered = false; + + // 璁惧ID + private String devId; + + // 鍒涘缓鏃堕棿 + private final long creationTime; + + // 鍗忚鐗堟湰鍙� + private Integer protocolVersion; + + private Header header; + + protected Session(Channel channel) { + this.channel = channel; + this.creationTime = System.currentTimeMillis(); + } + + public void writeObject(Object message) { + log.info("<<<<<<<<<< cmd{},{}", this, message); + channel.writeAndFlush(message); + } + + /** + * 鑾峰緱涓嬩竴涓祦姘村彿 + * + * @return 娴佹按鍙� + */ + public int nextSerialNo() { + int current; + int next; + do { + current = serialNo.get(); + next = current > 0xffff ? 0 : current; + } while (!serialNo.compareAndSet(current, next + 1)); + return next; + } + + /** + * 娉ㄥ唽session + * + * @param devId 璁惧ID + */ + public void register(String devId, Integer version, Header header) { + this.devId = devId; + this.registered = true; + this.protocolVersion = version; + this.header = header; + SessionManager.INSTANCE.put(devId, this); + } + + /** + * 鑾峰彇璁惧鍙� + * + * @return 璁惧鍙� + */ + public String getDevId() { + return devId; + } + + + public boolean isRegistered() { + return registered; + } + + public long getCreationTime() { + return creationTime; + } + + public Integer getProtocolVersion() { + return protocolVersion; + } + + public Header getHeader() { + return header; + } + + @Override + public String toString() { + return "[" + + "devId=" + devId + + ", reg=" + registered + + ", version=" + protocolVersion + + ",ip=" + channel.remoteAddress() + + ']'; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java new file mode 100644 index 0000000..9347249 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java @@ -0,0 +1,127 @@ +package com.genersoft.iot.vmp.jt1078.session; + +import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd; +import io.netty.channel.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; + + +/** + * @author QingtaiJiang + * @date 2023/4/27 19:54 + * @email qingtaij@163.com + */ +public enum SessionManager { + INSTANCE; + private final static Logger log = LoggerFactory.getLogger(SessionManager.class); + + // 鐢ㄤ笌娑堟伅鐨勭紦瀛� + private final Map<String, SynchronousQueue<String>> topicSubscribers = new ConcurrentHashMap<>(); + + // session鐨勭紦瀛� + private final Map<Object, Session> sessionMap; + + SessionManager() { + this.sessionMap = new ConcurrentHashMap<>(); + } + + /** + * 鍒涘缓鏂扮殑Session + * + * @param channel netty閫氶亾 + * @return 鍒涘缓鐨剆ession瀵硅薄 + */ + public Session newSession(Channel channel) { + return new Session(channel); + } + + + /** + * 鑾峰彇鎸囧畾璁惧鐨凷ession + * + * @param clientId 璁惧Id + * @return Session + */ + public Session get(Object clientId) { + return sessionMap.get(clientId); + } + + /** + * 鏀惧叆鏂拌澶囪繛鎺ョ殑session + * + * @param clientId 璁惧ID + * @param newSession session + */ + protected void put(Object clientId, Session newSession) { + sessionMap.put(clientId, newSession); + } + + + /** + * 鍙戦�佸悓姝ユ秷鎭紝鎺ユ敹鍝嶅簲 + * 榛樿瓒呮椂鏃堕棿6绉� + */ + public String request(Cmd cmd) { + // 榛樿6绉� + int timeOut = 6000; + return request(cmd, timeOut); + } + + public String request(Cmd cmd, Integer timeOut) { + Session session = this.get(cmd.getDevId()); + if (session == null) { + log.error("DevId: {} not online!", cmd.getDevId()); + return "-1"; + } + String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo()); + SynchronousQueue<String> subscribe = subscribe(requestKey); + if (subscribe == null) { + log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey); + return "-1"; + } + session.writeObject(cmd); + try { + return subscribe.poll(timeOut, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("<<<<<<<<<< timeout" + session, e); + } finally { + this.unsubscribe(requestKey); + } + return null; + } + + public Boolean response(String devId, String respId, Long responseNo, String data) { + String requestKey = requestKey(devId, respId, responseNo); + SynchronousQueue<String> queue = topicSubscribers.get(requestKey); + if (queue != null) { + try { + return queue.offer(data, 2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("{}", e.getMessage(), e); + } + } + log.warn("鏈壘鍒板搴斿洖澶嶆寚浠�,key:{} 娑堟伅:{} ", requestKey, data); + return false; + } + + private void unsubscribe(String key) { + topicSubscribers.remove(key); + } + + private SynchronousQueue<String> subscribe(String key) { + SynchronousQueue<String> queue = null; + if (!topicSubscribers.containsKey(key)) + topicSubscribers.put(key, queue = new SynchronousQueue<String>()); + return queue; + } + + private String requestKey(String devId, String respId, Long requestNo) { + return String.join("_", devId.replaceFirst("^0*", ""), respId, requestNo.toString()); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java b/src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java new file mode 100644 index 0000000..31f8b93 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.jt1078.util; + +/** + * 32浣嶆暣鍨嬬殑浜岃繘鍒惰鍐� + */ +public class Bin { + + private static final int[] bits = new int[32]; + + static { + bits[0] = 1; + for (int i = 1; i < bits.length; i++) { + bits[i] = bits[i - 1] << 1; + } + } + + /** + * 璇诲彇n鐨勭i浣� + * + * @param n int32 + * @param i 鍙栧�艰寖鍥�0-31 + */ + public static boolean get(int n, int i) { + return (n & bits[i]) == bits[i]; + } + + /** + * 涓嶈冻浣嶆暟浠庡乏杈瑰姞0 + */ + public static String strHexPaddingLeft(String data, int length) { + int dataLength = data.length(); + if (dataLength < length) { + StringBuilder dataBuilder = new StringBuilder(data); + for (int i = dataLength; i < length; i++) { + dataBuilder.insert(0, "0"); + } + data = dataBuilder.toString(); + } + return data; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java b/src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java new file mode 100644 index 0000000..3dcb1b8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java @@ -0,0 +1,112 @@ +package com.genersoft.iot.vmp.jt1078.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.core.io.support.ResourcePatternResolver; + +import java.lang.annotation.Annotation; +import java.util.LinkedList; +import java.util.List; + +public class ClassUtil { + + private static final Logger logger = LoggerFactory.getLogger(ClassUtil.class); + + + public static Object getBean(Class<?> clazz) { + if (clazz != null) { + try { + return clazz.getDeclaredConstructor().newInstance(); + } catch (Exception ex) { + logger.error("ClassUtil:鎵句笉鍒版寚瀹氱殑绫�", ex); + } + } + return null; + } + + + public static Object getBean(String className) { + Class<?> clazz = null; + try { + clazz = Class.forName(className); + } catch (Exception ex) { + logger.error("ClassUtil:鎵句笉鍒版寚瀹氱殑绫�"); + } + if (clazz != null) { + try { + //鑾峰彇澹版槑鐨勬瀯閫犲櫒--銆嬪垱寤哄疄渚� + return clazz.getDeclaredConstructor().newInstance(); + } catch (Exception ex) { + logger.error("ClassUtil:鎵句笉鍒版寚瀹氱殑绫�", ex); + } + } + return null; + } + + + /** + * 鑾峰彇鍖呬笅鎵�鏈夊甫娉ㄨВ鐨刢lass + * + * @param packageName 鍖呭悕 + * @param annotationClass 娉ㄨВ绫诲瀷 + * @return list + */ + public static List<Class<?>> getClassList(String packageName, Class<? extends Annotation> annotationClass) { + List<Class<?>> classList = getClassList(packageName); + classList.removeIf(next -> !next.isAnnotationPresent(annotationClass)); + return classList; + } + + public static List<Class<?>> getClassList(String... packageName) { + List<Class<?>> classList = new LinkedList<>(); + for (String s : packageName) { + List<Class<?>> c = getClassList(s); + classList.addAll(c); + } + return classList; + } + + public static List<Class<?>> getClassList(String packageName) { + List<Class<?>> classList = new LinkedList<>(); + try { + ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver(); + Resource[] resources = resourcePatternResolver.getResources(packageName.replace(".", "/") + "/**/*.class"); + for (Resource resource : resources) { + String url = resource.getURL().toString(); + + String[] split = url.split(packageName.replace(".", "/")); + String s = split[split.length - 1]; + String className = s.replace("/", "."); + className = className.substring(0, className.lastIndexOf(".")); + doAddClass(classList, packageName + className); + } + + } catch (Exception e) { + throw new RuntimeException(e); + } + return classList; + } + + private static void doAddClass(List<Class<?>> classList, String className) { + Class<?> cls = loadClass(className, false); + classList.add(cls); + } + + public static Class<?> loadClass(String className, boolean isInitialized) { + Class<?> cls; + try { + cls = Class.forName(className, isInitialized, getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + return cls; + } + + + public static ClassLoader getClassLoader() { + return Thread.currentThread().getContextClassLoader(); + } + +} diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index 0fba9a9..cc2145a 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -92,6 +92,15 @@ # 鏄惁瀛樺偍alarm淇℃伅 alarm: false +# 鍋氫负JT1078鏈嶅姟鍣ㄧ殑閰嶇疆 +jt1078: + #[蹇呴』淇敼] 鏄惁寮�鍚�1078鐨勬湇鍔� + enable: true + #[蹇呬慨淇敼] 1708璁惧鎺ュ叆鐨勭鍙� + port: 21078 + #[鍙�塢 璁惧閴存潈鐨勫瘑鐮� + password: admin123 + #zlm 榛樿鏈嶅姟鍣ㄩ厤缃� media: # [蹇呴』淇敼] zlm鏈嶅姟鍣ㄥ敮涓�id锛岀敤浜庤Е鍙慼ook鏃跺尯鍒槸鍝彴鏈嶅姟鍣�,general.mediaServerId -- Gitblit v1.8.0