648540858
2023-04-28 ebc904e4d5fe07ecc269927f0e6669ad4f8bda19
Merge pull request #836 from keDaYao/featur-jt1078

新增JT1078 Template支持
1个文件已修改
28个文件已添加
1868 ■■■■■ 已修改文件
src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java 146 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java 151 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java 110 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java 85 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java 114 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java 127 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/all-application.yml 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/genersoft/iot/vmp/jt1078/annotation/MsgId.java
New file
@@ -0,0 +1,15 @@
package com.genersoft.iot.vmp.jt1078.annotation;
import java.lang.annotation.*;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:31
 * @email qingtaij@163.com
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MsgId {
    String id();
}
src/main/java/com/genersoft/iot/vmp/jt1078/cmd/JT1078Template.java
New file
@@ -0,0 +1,56 @@
package com.genersoft.iot.vmp.jt1078.cmd;
import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
import com.genersoft.iot.vmp.jt1078.proc.response.J9102;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import java.util.Random;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:58
 * @email qingtaij@163.com
 */
public class JT1078Template {
    private final Random random = new Random();
    /**
     * 开启直播视频
     *
     * @param devId 设备号
     * @param j9101 开启视频参数
     */
    public String startLive(String devId, J9101 j9101, Integer timeOut) {
        Cmd cmd = new Cmd.Builder()
                .setDevId(devId)
                .setPackageNo(randomInt())
                .setMsgId("9101")
                .setRespId("0001")
                .setRs(j9101)
                .build();
        return SessionManager.INSTANCE.request(cmd, timeOut);
    }
    /**
     * 关闭直播视频
     *
     * @param devId 设备号
     * @param j9102 关闭视频参数
     */
    public String stopLive(String devId, J9102 j9102, Integer timeOut) {
        Cmd cmd = new Cmd.Builder()
                .setDevId(devId)
                .setPackageNo(randomInt())
                .setMsgId("9102")
                .setRespId("0001")
                .setRs(j9102)
                .build();
        return SessionManager.INSTANCE.request(cmd, timeOut);
    }
    private Long randomInt() {
        return (long) random.nextInt(1000) + 1;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/codec/decode/Jt808Decoder.java
New file
@@ -0,0 +1,146 @@
package com.genersoft.iot.vmp.jt1078.codec.decode;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory;
import com.genersoft.iot.vmp.jt1078.proc.request.Re;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:10
 * @email qingtaij@163.com
 */
public class Jt808Decoder extends ByteToMessageDecoder {
    private final static Logger log = LoggerFactory.getLogger(Jt808Decoder.class);
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Session session = ctx.channel().attr(Session.KEY).get();
        log.info("> {} hex:{}", session, ByteBufUtil.hexDump(in));
        try {
            ByteBuf buf = unEscapeAndCheck(in);
            Header header = new Header();
            header.setMsgId(ByteBufUtil.hexDump(buf.readSlice(2)));
            header.setMsgPro(buf.readUnsignedShort());
            if (header.is2019Version()) {
                header.setVersion(buf.readUnsignedByte());
                String devId = ByteBufUtil.hexDump(buf.readSlice(10));
                header.setDevId(devId.replaceFirst("^0*", ""));
            } else {
                header.setDevId(ByteBufUtil.hexDump(buf.readSlice(6)).replaceFirst("^0*", ""));
            }
            header.setSn(buf.readUnsignedShort());
            Re handler = CodecFactory.getHandler(header.getMsgId());
            if (handler == null) {
                log.error("get msgId is null {}", header.getMsgId());
                return;
            }
            Rs decode = handler.decode(buf, header, session);
            if (decode != null) {
                out.add(decode);
            }
        } finally {
            in.skipBytes(in.readableBytes());
        }
    }
    /**
     * 转义与验证校验码
     *
     * @param byteBuf 转义Buf
     * @return 转义好的数据
     */
    public ByteBuf unEscapeAndCheck(ByteBuf byteBuf) throws Exception {
        int low = byteBuf.readerIndex();
        int high = byteBuf.writerIndex();
        byte checkSum = 0;
        int calculationCheckSum = 0;
        byte aByte = byteBuf.getByte(high - 2);
        byte protocolEscapeFlag7d = 0x7d;
        //0x7d转义
        byte protocolEscapeFlag01 = 0x01;
        //0x7e转义
        byte protocolEscapeFlag02 = 0x02;
        if (aByte == protocolEscapeFlag7d) {
            byte b2 = byteBuf.getByte(high - 1);
            if (b2 == protocolEscapeFlag01) {
                checkSum = protocolEscapeFlag7d;
            } else if (b2 == protocolEscapeFlag02) {
                checkSum = 0x7e;
            } else {
                log.error("转义1异常:{}", ByteBufUtil.hexDump(byteBuf));
                throw new Exception("转义错误");
            }
            high = high - 2;
        } else {
            high = high - 1;
            checkSum = byteBuf.getByte(high);
        }
        List<ByteBuf> bufList = new ArrayList<>();
        int index = low;
        while (index < high) {
            byte b = byteBuf.getByte(index);
            if (b == protocolEscapeFlag7d) {
                byte c = byteBuf.getByte(index + 1);
                if (c == protocolEscapeFlag01) {
                    ByteBuf slice = slice0x01(byteBuf, low, index);
                    bufList.add(slice);
                    b = protocolEscapeFlag7d;
                } else if (c == protocolEscapeFlag02) {
                    ByteBuf slice = slice0x02(byteBuf, low, index);
                    bufList.add(slice);
                    b = 0x7e;
                } else {
                    log.error("转义2异常:{}", ByteBufUtil.hexDump(byteBuf));
                    throw new Exception("转义错误");
                }
                index += 2;
                low = index;
            } else {
                index += 1;
            }
            calculationCheckSum = calculationCheckSum ^ b;
        }
        if (calculationCheckSum == checkSum) {
            if (bufList.size() == 0) {
                return byteBuf.slice(low, high);
            } else {
                bufList.add(byteBuf.slice(low, high - low));
                return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, bufList.size(), bufList);
            }
        } else {
            log.info("{} 解析校验码:{}--计算校验码:{}", ByteBufUtil.hexDump(byteBuf), checkSum, calculationCheckSum);
            throw new Exception("校验码错误!");
        }
    }
    private ByteBuf slice0x01(ByteBuf buf, int low, int sign) {
        return buf.slice(low, sign - low + 1);
    }
    private ByteBuf slice0x02(ByteBuf buf, int low, int sign) {
        buf.setByte(sign, 0x7e);
        return buf.slice(low, sign - low + 1);
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808Encoder.java
New file
@@ -0,0 +1,33 @@
package com.genersoft.iot.vmp.jt1078.codec.encode;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:10
 * @email qingtaij@163.com
 */
public class Jt808Encoder extends MessageToByteEncoder<Rs> {
    private final static Logger log = LoggerFactory.getLogger(Jt808Encoder.class);
    @Override
    protected void encode(ChannelHandlerContext ctx, Rs msg, ByteBuf out) throws Exception {
        Session session = ctx.channel().attr(Session.KEY).get();
        ByteBuf encode = Jt808EncoderCmd.encode(msg, session, session.nextSerialNo());
        if(encode!=null){
            log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
            out.writeBytes(encode);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/codec/encode/Jt808EncoderCmd.java
New file
@@ -0,0 +1,151 @@
package com.genersoft.iot.vmp.jt1078.codec.encode;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import com.genersoft.iot.vmp.jt1078.util.Bin;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.ByteProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.util.LinkedList;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:25
 * @email qingtaij@163.com
 */
public class Jt808EncoderCmd extends MessageToByteEncoder<Cmd> {
    private final static Logger log = LoggerFactory.getLogger(Jt808EncoderCmd.class);
    @Override
    protected void encode(ChannelHandlerContext ctx, Cmd cmd, ByteBuf out) throws Exception {
        Session session = ctx.channel().attr(Session.KEY).get();
        Rs msg = cmd.getRs();
        ByteBuf encode = encode(msg, session, cmd.getPackageNo().intValue());
        if (encode != null) {
            log.info("< {} hex:{}", session, ByteBufUtil.hexDump(encode));
            out.writeBytes(encode);
        }
    }
    public static ByteBuf encode(Rs msg, Session session, Integer packageNo) {
        String id = msg.getClass().getAnnotation(MsgId.class).id();
        if (!StringUtils.hasLength(id)) {
            log.error("Not find msgId");
            return null;
        }
        ByteBuf byteBuf = Unpooled.buffer();
        byteBuf.writeBytes(ByteBufUtil.decodeHexDump(id));
        ByteBuf encode = msg.encode();
        Header header = msg.getHeader();
        if (header == null) {
            header = session.getHeader();
        }
        if (header.is2019Version()) {
            // 消息体属性
            byteBuf.writeShort(encode.readableBytes() | 1 << 14);
            // 版本号
            byteBuf.writeByte(header.getVersion());
            // 终端手机号
            byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 20)));
        } else {
            // 消息体属性
            byteBuf.writeShort(encode.readableBytes());
            byteBuf.writeBytes(ByteBufUtil.decodeHexDump(Bin.strHexPaddingLeft(header.getDevId(), 12)));
        }
        // 消息体流水号
        byteBuf.writeShort(packageNo);
        // 写入消息体
        byteBuf.writeBytes(encode);
        // 计算校验码,并反转义
        byteBuf = escapeAndCheck0(byteBuf);
        return byteBuf;
    }
    private static final ByteProcessor searcher = value -> !(value == 0x7d || value == 0x7e);
    //转义与校验
    public static ByteBuf escapeAndCheck0(ByteBuf source) {
        sign(source);
        int low = source.readerIndex();
        int high = source.writerIndex();
        LinkedList<ByteBuf> bufList = new LinkedList<>();
        int mark, len;
        while ((mark = source.forEachByte(low, high - low, searcher)) > 0) {
            len = mark + 1 - low;
            ByteBuf[] slice = slice(source, low, len);
            bufList.add(slice[0]);
            bufList.add(slice[1]);
            low += len;
        }
        if (bufList.size() > 0) {
            bufList.add(source.slice(low, high - low));
        } else {
            bufList.add(source);
        }
        ByteBuf delimiter = Unpooled.buffer(1, 1).writeByte(0x7e).retain();
        bufList.addFirst(delimiter);
        bufList.addLast(delimiter);
        CompositeByteBuf byteBufLs = Unpooled.compositeBuffer(bufList.size());
        byteBufLs.addComponents(true, bufList);
        return byteBufLs;
    }
    public static void sign(ByteBuf buf) {
        byte checkCode = bcc(buf);
        buf.writeByte(checkCode);
    }
    public static byte bcc(ByteBuf byteBuf) {
        byte cs = 0;
        while (byteBuf.isReadable())
            cs ^= byteBuf.readByte();
        byteBuf.resetReaderIndex();
        return cs;
    }
    protected static ByteBuf[] slice(ByteBuf byteBuf, int index, int length) {
        byte first = byteBuf.getByte(index + length - 1);
        ByteBuf[] byteBufList = new ByteBuf[2];
        byteBufList[0] = byteBuf.retainedSlice(index, length);
        if (first == 0x7d) {
            byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x01);
        } else {
            byteBuf.setByte(index + length - 1, 0x7d);
            byteBufList[1] = Unpooled.buffer(1, 1).writeByte(0x02);
        }
        return byteBufList;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/Jt808Handler.java
New file
@@ -0,0 +1,72 @@
package com.genersoft.iot.vmp.jt1078.codec.netty;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:14
 * @email qingtaij@163.com
 */
public class Jt808Handler extends ChannelInboundHandlerAdapter {
    private final static Logger log = LoggerFactory.getLogger(Jt808Handler.class);
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Rs) {
            ctx.writeAndFlush(msg);
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        Session session = SessionManager.INSTANCE.newSession(channel);
        channel.attr(Session.KEY).set(session);
        log.info("> Tcp connect {}", session);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        Session session = ctx.channel().attr(Session.KEY).get();
        log.info("< Tcp disconnect {}", session);
        ctx.close();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
        Session session = ctx.channel().attr(Session.KEY).get();
        String message = e.getMessage();
        if (message.toLowerCase().contains("Connection reset by peer".toLowerCase())) {
            log.info("< exception{} {}", session, e.getMessage());
        } else {
            log.info("< exception{} {}", session, e.getMessage(), e);
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            IdleState state = event.state();
            if (state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) {
                Session session = ctx.channel().attr(Session.KEY).get();
                log.warn("< Proactively disconnect{}", session);
                ctx.close();
            }
        }
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/codec/netty/TcpServer.java
New file
@@ -0,0 +1,112 @@
package com.genersoft.iot.vmp.jt1078.codec.netty;
import com.genersoft.iot.vmp.jt1078.codec.decode.Jt808Decoder;
import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808Encoder;
import com.genersoft.iot.vmp.jt1078.codec.encode.Jt808EncoderCmd;
import com.genersoft.iot.vmp.jt1078.proc.factory.CodecFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:01
 * @email qingtaij@163.com
 */
public class TcpServer {
    private final static Logger log = LoggerFactory.getLogger(TcpServer.class);
    private final Integer port;
    private boolean isRunning = false;
    private EventLoopGroup bossGroup = null;
    private EventLoopGroup workerGroup = null;
    private final ByteBuf DECODER_JT808 = Unpooled.wrappedBuffer(new byte[]{0x7e});
    public TcpServer(Integer port) {
        this.port = port;
    }
    private void startTcpServer() {
        try {
            CodecFactory.init();
            this.bossGroup = new NioEventLoopGroup();
            this.workerGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.option(NioChannelOption.SO_BACKLOG, 1024)
                    .option(NioChannelOption.SO_REUSEADDR, true)
                    .childOption(NioChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        public void initChannel(NioSocketChannel channel) {
                            channel.pipeline()
                                    .addLast(new IdleStateHandler(10, 0, 0, TimeUnit.MINUTES))
                                    .addLast(new DelimiterBasedFrameDecoder(1024 * 2, DECODER_JT808))
                                    .addLast(new Jt808Decoder())
                                    .addLast(new Jt808Encoder())
                                    .addLast(new Jt808EncoderCmd())
                                    .addLast(new Jt808Handler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            // 监听设备TCP端口是否启动成功
            channelFuture.addListener(future -> {
                if (!future.isSuccess()) {
                    log.error("Binding port:{} fail!  cause: {}", port, future.cause().getCause(), future.cause());
                }
            });
            log.info("服务:JT808 Server 启动成功, port:{}", port);
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            log.warn("服务:JT808 Server 启动异常, port:{},{}", port, e.getMessage(), e);
        } finally {
            stop();
        }
    }
    /**
     * 开启一个新的线程,拉起来Netty
     */
    public synchronized void start() {
        if (this.isRunning) {
            log.warn("服务:JT808 Server 已经启动, port:{}", port);
            return;
        }
        this.isRunning = true;
        new Thread(this::startTcpServer).start();
    }
    public synchronized void stop() {
        if (!this.isRunning) {
            log.warn("服务:JT808 Server 已经停止, port:{}", port);
        }
        this.isRunning = false;
        Future<?> future = this.bossGroup.shutdownGracefully();
        if (!future.isSuccess()) {
            log.warn("bossGroup 无法正常停止", future.cause());
        }
        future = this.workerGroup.shutdownGracefully();
        if (!future.isSuccess()) {
            log.warn("workerGroup 无法正常停止", future.cause());
        }
        log.warn("服务:JT808 Server 已经停止, port:{}", port);
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/config/JT1078Controller.java
New file
@@ -0,0 +1,46 @@
package com.genersoft.iot.vmp.jt1078.config;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.proc.response.J9101;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * curl http://localhost:18080/api/jt1078/start/live/18864197066/1
 *
 * @author QingtaiJiang
 * @date 2023/4/27 18:12
 * @email qingtaij@163.com
 */
@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true")
@RestController
@RequestMapping("/api/jt1078")
public class JT1078Controller {
    @Resource
    JT1078Template jt1078Template;
    @GetMapping("/start/live/{deviceId}/{channelId}")
    public WVPResult<?> startLive(@PathVariable String deviceId, @PathVariable String channelId) {
        J9101 j9101 = new J9101();
        j9101.setChannel(Integer.valueOf(channelId));
        j9101.setIp("192.168.1.1");
        j9101.setRate(1);
        j9101.setTcpPort(7618);
        j9101.setUdpPort(7618);
        j9101.setType(0);
        String s = jt1078Template.startLive(deviceId, j9101, 6);
        WVPResult<String> wvpResult = new WVPResult<>();
        wvpResult.setCode(200);
        wvpResult.setData(String.format("http://192.168.1.1/rtp/%s_%s.live.mp4", deviceId, channelId));
        return wvpResult;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/config/TcpAutoConfiguration.java
New file
@@ -0,0 +1,30 @@
package com.genersoft.iot.vmp.jt1078.config;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.codec.netty.TcpServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 19:35
 * @email qingtaij@163.com
 */
@Order(Integer.MIN_VALUE)
@Configuration
@ConditionalOnProperty(value = "jt1078.enable", havingValue = "true")
public class TcpAutoConfiguration {
    @Bean(initMethod = "start", destroyMethod = "stop")
    public TcpServer jt1078Server(@Value("${jt1078.port}") Integer port) {
        return new TcpServer(port);
    }
    @Bean
    public JT1078Template jt1078Template() {
        return new JT1078Template();
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/Header.java
New file
@@ -0,0 +1,76 @@
package com.genersoft.iot.vmp.jt1078.proc;
import com.genersoft.iot.vmp.jt1078.util.Bin;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:22
 * @email qingtaij@163.com
 */
public class Header {
    // 消息ID
    String msgId;
    // 消息体属性
    Integer msgPro;
    // 标识
    String devId;
    // 消息体流水号
    Integer sn;
    // 协议版本号
    Short version = -1;
    public String getMsgId() {
        return msgId;
    }
    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }
    public Integer getMsgPro() {
        return msgPro;
    }
    public void setMsgPro(Integer msgPro) {
        this.msgPro = msgPro;
    }
    public String getDevId() {
        return devId;
    }
    public void setDevId(String devId) {
        this.devId = devId;
    }
    public Integer getSn() {
        return sn;
    }
    public void setSn(Integer sn) {
        this.sn = sn;
    }
    public Short getVersion() {
        return version;
    }
    public void setVersion(Short version) {
        this.version = version;
    }
    /**
     * 判断是否是2019的版本
     *
     * @return true 2019后的版本。false 2013
     */
    public boolean is2019Version() {
        return Bin.get(msgPro, 14);
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/entity/Cmd.java
New file
@@ -0,0 +1,105 @@
package com.genersoft.iot.vmp.jt1078.proc.entity;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:23
 * @email qingtaij@163.com
 */
public class Cmd {
    String devId;
    Long packageNo;
    String msgId;
    String respId;
    Rs rs;
    public Cmd() {
    }
    public Cmd(Builder builder) {
        this.devId = builder.devId;
        this.packageNo = builder.packageNo;
        this.msgId = builder.msgId;
        this.respId = builder.respId;
        this.rs = builder.rs;
    }
    public String getDevId() {
        return devId;
    }
    public void setDevId(String devId) {
        this.devId = devId;
    }
    public Long getPackageNo() {
        return packageNo;
    }
    public void setPackageNo(Long packageNo) {
        this.packageNo = packageNo;
    }
    public String getMsgId() {
        return msgId;
    }
    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }
    public String getRespId() {
        return respId;
    }
    public void setRespId(String respId) {
        this.respId = respId;
    }
    public Rs getRs() {
        return rs;
    }
    public void setRs(Rs rs) {
        this.rs = rs;
    }
    public static class Builder {
        String devId;
        Long packageNo;
        String msgId;
        String respId;
        Rs rs;
        public Builder setDevId(String devId) {
            this.devId = devId.replaceFirst("^0*", "");
            return this;
        }
        public Builder setPackageNo(Long packageNo) {
            this.packageNo = packageNo;
            return this;
        }
        public Builder setMsgId(String msgId) {
            this.msgId = msgId;
            return this;
        }
        public Builder setRespId(String respId) {
            this.respId = respId;
            return this;
        }
        public Builder setRs(Rs re) {
            this.rs = re;
            return this;
        }
        public Cmd build() {
            return new Cmd(this);
        }
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/factory/CodecFactory.java
New file
@@ -0,0 +1,44 @@
package com.genersoft.iot.vmp.jt1078.proc.factory;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.request.Re;
import com.genersoft.iot.vmp.jt1078.util.ClassUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:29
 * @email qingtaij@163.com
 */
public class CodecFactory {
    private final static Logger log = LoggerFactory.getLogger(CodecFactory.class);
    private static Map<String, Class<?>> protocolHash;
    public static void init() {
        protocolHash = new HashMap<>();
        List<Class<?>> classList = ClassUtil.getClassList("com.genersoft.iot.vmp.jt1078.proc", MsgId.class);
        for (Class<?> handlerClass : classList) {
            String id = handlerClass.getAnnotation(MsgId.class).id();
            protocolHash.put(id, handlerClass);
        }
        if (log.isDebugEnabled()) {
            log.debug("消息ID缓存表 protocolHash:{}", protocolHash);
        }
    }
    public static Re getHandler(String msgId) {
        Class<?> aClass = protocolHash.get(msgId);
        Object bean = ClassUtil.getBean(aClass);
        if (bean instanceof Re) {
            return (Re) bean;
        }
        return null;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0001.java
New file
@@ -0,0 +1,50 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import com.genersoft.iot.vmp.jt1078.session.SessionManager;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
/**
 * 终端通用应答
 *
 * @author QingtaiJiang
 * @date 2023/4/27 18:04
 * @email qingtaij@163.com
 */
@MsgId(id = "0001")
public class J0001 extends Re {
    int respNo;
    String respId;
    int result;
    @Override
    protected Rs decode0(ByteBuf buf, Header header, Session session) {
        respNo = buf.readUnsignedShort();
        respId = ByteBufUtil.hexDump(buf.readSlice(2));
        result = buf.readUnsignedByte();
        return null;
    }
    @Override
    protected Rs handler(Header header, Session session) {
        SessionManager.INSTANCE.response(header.getDevId(), "0001", (long) respNo, JSON.toJSONString(this));
        return null;
    }
    public int getRespNo() {
        return respNo;
    }
    public String getRespId() {
        return respId;
    }
    public int getResult() {
        return result;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0002.java
New file
@@ -0,0 +1,32 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.J8001;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
/**
 * 终端心跳
 *
 * @author QingtaiJiang
 * @date 2023/4/27 18:04
 * @email qingtaij@163.com
 */
@MsgId(id = "0002")
public class J0002 extends Re {
    @Override
    protected Rs decode0(ByteBuf buf, Header header, Session session) {
        return null;
    }
    @Override
    protected Rs handler(Header header, Session session) {
        J8001 j8001 = new J8001();
        j8001.setRespNo(header.getSn());
        j8001.setRespId(header.getMsgId());
        j8001.setResult(J8001.SUCCESS);
        return j8001;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0004.java
New file
@@ -0,0 +1,27 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
/**
 * 查询服务器时间
 *
 * @author QingtaiJiang
 * @date 2023/4/27 18:06
 * @email qingtaij@163.com
 */
@MsgId(id = "0004")
public class J0004 extends Re {
    @Override
    protected Rs decode0(ByteBuf buf, Header header, Session session) {
        return null;
    }
    @Override
    protected Rs handler(Header header, Session session) {
        return null;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0100.java
New file
@@ -0,0 +1,56 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.J8100;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
/**
 * 终端注册
 *
 * @author QingtaiJiang
 * @date 2023/4/27 18:06
 * @email qingtaij@163.com
 */
@MsgId(id = "0100")
public class J0100 extends Re {
    private int provinceId;
    private int cityId;
    private String makerId;
    private String deviceModel;
    private String deviceId;
    private int plateColor;
    private String plateNo;
    @Override
    protected Rs decode0(ByteBuf buf, Header header, Session session) {
        Short version = header.getVersion();
        provinceId = buf.readUnsignedShort();
        if (version > 1) {
            cityId = buf.readUnsignedShort();
            // decode as 2019
        } else {
            int i = buf.readUnsignedShort();
            // decode as 2013
        }
        return null;
    }
    @Override
    protected Rs handler(Header header, Session session) {
        J8100 j8100 = new J8100();
        j8100.setRespNo(header.getSn());
        j8100.setResult(J8100.SUCCESS);
        j8100.setCode("WVP_YYDS");
        return j8100;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0102.java
New file
@@ -0,0 +1,36 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.J8001;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
/**
 * 终端鉴权
 *
 * @author QingtaiJiang
 * @date 2023/4/27 18:06
 * @email qingtaij@163.com
 */
@MsgId(id = "0102")
public class J0102 extends Re {
    @Override
    protected Rs decode0(ByteBuf buf, Header header, Session session) {
        int lenCode = buf.readUnsignedByte();
//        String code = buf.readCharSequence(lenCode, CharsetUtil.UTF_8).toString();
        // if 2019 to decode next
        return null;
    }
    @Override
    protected Rs handler(Header header, Session session) {
        J8001 j8001 = new J8001();
        j8001.setRespNo(header.getSn());
        j8001.setRespId(header.getMsgId());
        j8001.setResult(J8001.SUCCESS);
        return j8001;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/J0200.java
New file
@@ -0,0 +1,32 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.J8001;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
/**
 * 实时消息上报
 *
 * @author QingtaiJiang
 * @date 2023/4/27 18:06
 * @email qingtaij@163.com
 */
@MsgId(id = "0200")
public class J0200 extends Re {
    @Override
    protected Rs decode0(ByteBuf buf, Header header, Session session) {
        return null;
    }
    @Override
    protected Rs handler(Header header, Session session) {
        J8001 j8001 = new J8001();
        j8001.setRespNo(header.getSn());
        j8001.setRespId(header.getMsgId());
        j8001.setResult(J8001.SUCCESS);
        return j8001;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/request/Re.java
New file
@@ -0,0 +1,40 @@
package com.genersoft.iot.vmp.jt1078.proc.request;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import com.genersoft.iot.vmp.jt1078.proc.response.Rs;
import com.genersoft.iot.vmp.jt1078.session.Session;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:50
 * @email qingtaij@163.com
 */
public abstract class Re {
    private final static Logger log = LoggerFactory.getLogger(Re.class);
    protected abstract Rs decode0(ByteBuf buf, Header header, Session session);
    protected abstract Rs handler(Header header, Session session);
    public Rs decode(ByteBuf buf, Header header, Session session) {
        if (session != null && !StringUtils.hasLength(session.getDevId())) {
            session.register(header.getDevId(), (int) header.getVersion(), header);
        }
        Rs rs = decode0(buf, header, session);
        Rs rsHand = handler(header, session);
        if (rs == null && rsHand != null) {
            rs = rsHand;
        } else if (rs != null && rsHand != null) {
            log.warn("decode0:{} 与 handler:{} 返回值冲突,采用decode0返回值", rs, rsHand);
        }
        if (rs != null) {
            rs.setHeader(header);
        }
        return rs;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8001.java
New file
@@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:48
 * @email qingtaij@163.com
 */
@MsgId(id = "8001")
public class J8001 extends Rs {
    public static final Integer SUCCESS = 0;
    Integer respNo;
    String respId;
    Integer result;
    @Override
    public ByteBuf encode() {
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeShort(respNo);
        buffer.writeBytes(ByteBufUtil.decodeHexDump(respId));
        buffer.writeByte(result);
        return buffer;
    }
    public void setRespNo(Integer respNo) {
        this.respNo = respNo;
    }
    public void setRespId(String respId) {
        this.respId = respId;
    }
    public void setResult(Integer result) {
        this.result = result;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J8100.java
New file
@@ -0,0 +1,41 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:40
 * @email qingtaij@163.com
 */
@MsgId(id = "8100")
public class J8100 extends Rs {
    public static final Integer SUCCESS = 0;
    Integer respNo;
    Integer result;
    String code;
    @Override
    public ByteBuf encode() {
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeShort(respNo);
        buffer.writeByte(result);
        buffer.writeCharSequence(code, CharsetUtil.UTF_8);
        return buffer;
    }
    public void setRespNo(Integer respNo) {
        this.respNo = respNo;
    }
    public void setResult(Integer result) {
        this.result = result;
    }
    public void setCode(String code) {
        this.code = code;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9101.java
New file
@@ -0,0 +1,110 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.annotation.MsgId;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:25
 * @email qingtaij@163.com
 */
@MsgId(id = "9101")
public class J9101 extends Rs {
    String ip;
    // TCP端口
    Integer tcpPort;
    // UDP端口
    Integer udpPort;
    // 逻辑通道号
    Integer channel;
    // 数据类型
    /**
     * 0:音视频,1:视频,2:双向对讲,3:监听,4:中心广播,5:透传
     */
    Integer type;
    // 码流类型
    /**
     * 0:主码流,1:子码流
     */
    Integer rate;
    @Override
    public ByteBuf encode() {
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeByte(ip.getBytes().length);
        buffer.writeCharSequence(ip, CharsetUtil.UTF_8);
        buffer.writeShort(tcpPort);
        buffer.writeShort(udpPort);
        buffer.writeByte(channel);
        buffer.writeByte(type);
        buffer.writeByte(rate);
        return buffer;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public Integer getTcpPort() {
        return tcpPort;
    }
    public void setTcpPort(Integer tcpPort) {
        this.tcpPort = tcpPort;
    }
    public Integer getUdpPort() {
        return udpPort;
    }
    public void setUdpPort(Integer udpPort) {
        this.udpPort = udpPort;
    }
    public Integer getChannel() {
        return channel;
    }
    public void setChannel(Integer channel) {
        this.channel = channel;
    }
    public Integer getType() {
        return type;
    }
    public void setType(Integer type) {
        this.type = type;
    }
    public Integer getRate() {
        return rate;
    }
    public void setRate(Integer rate) {
        this.rate = rate;
    }
    @Override
    public String toString() {
        return "J9101{" +
                "ip='" + ip + '\'' +
                ", tcpPort=" + tcpPort +
                ", udpPort=" + udpPort +
                ", channel=" + channel +
                ", type=" + type +
                ", rate=" + rate +
                '}';
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/J9102.java
New file
@@ -0,0 +1,85 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:49
 * @email qingtaij@163.com
 */
public class J9102 extends Rs {
    // 通道号
    Integer channel;
    // 控制指令
    /**
     * 0:关闭音视频传输指令;
     * 1:切换码流(增加暂停和继续);
     * 2:暂停该通道所有流的发送;
     * 3:恢复暂停前流的发送,与暂停前的流类型一致;
     * 4:关闭双向对讲
     */
    Integer command;
    // 数据类型
    /**
     * 0:关闭该通道有关的音视频数据;
     * 1:只关闭该通道有关的音频,保留该通道
     * 有关的视频;
     * 2:只关闭该通道有关的视频,保留该通道
     * 有关的音频
     */
    Integer closeType;
    // 数据类型
    /**
     * 0:主码流;
     * 1:子码流
     */
    Integer streamType;
    @Override
    public ByteBuf encode() {
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeByte(channel);
        buffer.writeByte(command);
        buffer.writeByte(closeType);
        buffer.writeByte(streamType);
        return null;
    }
    public Integer getChannel() {
        return channel;
    }
    public void setChannel(Integer channel) {
        this.channel = channel;
    }
    public Integer getCommand() {
        return command;
    }
    public void setCommand(Integer command) {
        this.command = command;
    }
    public Integer getCloseType() {
        return closeType;
    }
    public void setCloseType(Integer closeType) {
        this.closeType = closeType;
    }
    public Integer getStreamType() {
        return streamType;
    }
    public void setStreamType(Integer streamType) {
        this.streamType = streamType;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/proc/response/Rs.java
New file
@@ -0,0 +1,27 @@
package com.genersoft.iot.vmp.jt1078.proc.response;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import io.netty.buffer.ByteBuf;
/**
 * @author QingtaiJiang
 * @date 2021/8/30 18:54
 * @email qingtaij@163.com
 */
public abstract class Rs {
    private Header header;
    public abstract ByteBuf encode();
    public Header getHeader() {
        return header;
    }
    public void setHeader(Header header) {
        this.header = header;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/session/Session.java
New file
@@ -0,0 +1,114 @@
package com.genersoft.iot.vmp.jt1078.session;
import com.genersoft.iot.vmp.jt1078.proc.Header;
import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 18:54
 * @email qingtaij@163.com
 */
public class Session {
    private final static Logger log = LoggerFactory.getLogger(Session.class);
    public static final AttributeKey<Session> KEY = AttributeKey.newInstance(Session.class.getName());
    // Netty的channel
    protected final Channel channel;
    // 原子类的自增ID
    private final AtomicInteger serialNo = new AtomicInteger(0);
    // 是否注册成功
    private boolean registered = false;
    // 设备ID
    private String devId;
    // 创建时间
    private final long creationTime;
    // 协议版本号
    private Integer protocolVersion;
    private Header header;
    protected Session(Channel channel) {
        this.channel = channel;
        this.creationTime = System.currentTimeMillis();
    }
    public void writeObject(Object message) {
        log.info("<<<<<<<<<< cmd{},{}", this, message);
        channel.writeAndFlush(message);
    }
    /**
     * 获得下一个流水号
     *
     * @return 流水号
     */
    public int nextSerialNo() {
        int current;
        int next;
        do {
            current = serialNo.get();
            next = current > 0xffff ? 0 : current;
        } while (!serialNo.compareAndSet(current, next + 1));
        return next;
    }
    /**
     * 注册session
     *
     * @param devId 设备ID
     */
    public void register(String devId, Integer version, Header header) {
        this.devId = devId;
        this.registered = true;
        this.protocolVersion = version;
        this.header = header;
        SessionManager.INSTANCE.put(devId, this);
    }
    /**
     * 获取设备号
     *
     * @return 设备号
     */
    public String getDevId() {
        return devId;
    }
    public boolean isRegistered() {
        return registered;
    }
    public long getCreationTime() {
        return creationTime;
    }
    public Integer getProtocolVersion() {
        return protocolVersion;
    }
    public Header getHeader() {
        return header;
    }
    @Override
    public String toString() {
        return "[" +
                "devId=" + devId +
                ", reg=" + registered +
                ", version=" + protocolVersion +
                ",ip=" + channel.remoteAddress() +
                ']';
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/session/SessionManager.java
New file
@@ -0,0 +1,127 @@
package com.genersoft.iot.vmp.jt1078.session;
import com.genersoft.iot.vmp.jt1078.proc.entity.Cmd;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
 * @author QingtaiJiang
 * @date 2023/4/27 19:54
 * @email qingtaij@163.com
 */
public enum SessionManager {
    INSTANCE;
    private final static Logger log = LoggerFactory.getLogger(SessionManager.class);
    // 用与消息的缓存
    private final Map<String, SynchronousQueue<String>> topicSubscribers = new ConcurrentHashMap<>();
    // session的缓存
    private final Map<Object, Session> sessionMap;
    SessionManager() {
        this.sessionMap = new ConcurrentHashMap<>();
    }
    /**
     * 创建新的Session
     *
     * @param channel netty通道
     * @return 创建的session对象
     */
    public Session newSession(Channel channel) {
        return new Session(channel);
    }
    /**
     * 获取指定设备的Session
     *
     * @param clientId 设备Id
     * @return Session
     */
    public Session get(Object clientId) {
        return sessionMap.get(clientId);
    }
    /**
     * 放入新设备连接的session
     *
     * @param clientId   设备ID
     * @param newSession session
     */
    protected void put(Object clientId, Session newSession) {
        sessionMap.put(clientId, newSession);
    }
    /**
     * 发送同步消息,接收响应
     * 默认超时时间6秒
     */
    public String request(Cmd cmd) {
        // 默认6秒
        int timeOut = 6000;
        return request(cmd, timeOut);
    }
    public String request(Cmd cmd, Integer timeOut) {
        Session session = this.get(cmd.getDevId());
        if (session == null) {
            log.error("DevId: {} not online!", cmd.getDevId());
            return "-1";
        }
        String requestKey = requestKey(cmd.getDevId(), cmd.getRespId(), cmd.getPackageNo());
        SynchronousQueue<String> subscribe = subscribe(requestKey);
        if (subscribe == null) {
            log.error("DevId: {} key:{} send repaid", cmd.getDevId(), requestKey);
            return "-1";
        }
        session.writeObject(cmd);
        try {
            return subscribe.poll(timeOut, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("<<<<<<<<<< timeout" + session, e);
        } finally {
            this.unsubscribe(requestKey);
        }
        return null;
    }
    public Boolean response(String devId, String respId, Long responseNo, String data) {
        String requestKey = requestKey(devId, respId, responseNo);
        SynchronousQueue<String> queue = topicSubscribers.get(requestKey);
        if (queue != null) {
            try {
                return queue.offer(data, 2, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                log.error("{}", e.getMessage(), e);
            }
        }
        log.warn("未找到对应回复指令,key:{} 消息:{} ", requestKey, data);
        return false;
    }
    private void unsubscribe(String key) {
        topicSubscribers.remove(key);
    }
    private SynchronousQueue<String> subscribe(String key) {
        SynchronousQueue<String> queue = null;
        if (!topicSubscribers.containsKey(key))
            topicSubscribers.put(key, queue = new SynchronousQueue<String>());
        return queue;
    }
    private String requestKey(String devId, String respId, Long requestNo) {
        return String.join("_", devId.replaceFirst("^0*", ""), respId, requestNo.toString());
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/util/Bin.java
New file
@@ -0,0 +1,41 @@
package com.genersoft.iot.vmp.jt1078.util;
/**
 * 32位整型的二进制读写
 */
public class Bin {
    private static final int[] bits = new int[32];
    static {
        bits[0] = 1;
        for (int i = 1; i < bits.length; i++) {
            bits[i] = bits[i - 1] << 1;
        }
    }
    /**
     * 读取n的第i位
     *
     * @param n int32
     * @param i 取值范围0-31
     */
    public static boolean get(int n, int i) {
        return (n & bits[i]) == bits[i];
    }
    /**
     * 不足位数从左边加0
     */
    public static String strHexPaddingLeft(String data, int length) {
        int dataLength = data.length();
        if (dataLength < length) {
            StringBuilder dataBuilder = new StringBuilder(data);
            for (int i = dataLength; i < length; i++) {
                dataBuilder.insert(0, "0");
            }
            data = dataBuilder.toString();
        }
        return data;
    }
}
src/main/java/com/genersoft/iot/vmp/jt1078/util/ClassUtil.java
New file
@@ -0,0 +1,112 @@
package com.genersoft.iot.vmp.jt1078.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import java.lang.annotation.Annotation;
import java.util.LinkedList;
import java.util.List;
public class ClassUtil {
    private static final Logger logger = LoggerFactory.getLogger(ClassUtil.class);
    public static Object getBean(Class<?> clazz) {
        if (clazz != null) {
            try {
                return clazz.getDeclaredConstructor().newInstance();
            } catch (Exception ex) {
                logger.error("ClassUtil:找不到指定的类", ex);
            }
        }
        return null;
    }
    public static Object getBean(String className) {
        Class<?> clazz = null;
        try {
            clazz = Class.forName(className);
        } catch (Exception ex) {
            logger.error("ClassUtil:找不到指定的类");
        }
        if (clazz != null) {
            try {
                //获取声明的构造器--》创建实例
                return clazz.getDeclaredConstructor().newInstance();
            } catch (Exception ex) {
                logger.error("ClassUtil:找不到指定的类", ex);
            }
        }
        return null;
    }
    /**
     * 获取包下所有带注解的class
     *
     * @param packageName     包名
     * @param annotationClass 注解类型
     * @return list
     */
    public static List<Class<?>> getClassList(String packageName, Class<? extends Annotation> annotationClass) {
        List<Class<?>> classList = getClassList(packageName);
        classList.removeIf(next -> !next.isAnnotationPresent(annotationClass));
        return classList;
    }
    public static List<Class<?>> getClassList(String... packageName) {
        List<Class<?>> classList = new LinkedList<>();
        for (String s : packageName) {
            List<Class<?>> c = getClassList(s);
            classList.addAll(c);
        }
        return classList;
    }
    public static List<Class<?>> getClassList(String packageName) {
        List<Class<?>> classList = new LinkedList<>();
        try {
            ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
            Resource[] resources = resourcePatternResolver.getResources(packageName.replace(".", "/") + "/**/*.class");
            for (Resource resource : resources) {
                String url = resource.getURL().toString();
                String[] split = url.split(packageName.replace(".", "/"));
                String s = split[split.length - 1];
                String className = s.replace("/", ".");
                className = className.substring(0, className.lastIndexOf("."));
                doAddClass(classList, packageName + className);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return classList;
    }
    private static void doAddClass(List<Class<?>> classList, String className) {
        Class<?> cls = loadClass(className, false);
        classList.add(cls);
    }
    public static Class<?> loadClass(String className, boolean isInitialized) {
        Class<?> cls;
        try {
            cls = Class.forName(className, isInitialized, getClassLoader());
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
        return cls;
    }
    public static ClassLoader getClassLoader() {
        return Thread.currentThread().getContextClassLoader();
    }
}
src/main/resources/all-application.yml
@@ -92,6 +92,15 @@
    # 是否存储alarm信息
    alarm: false
# 做为JT1078服务器的配置
jt1078:
    #[必须修改] 是否开启1078的服务
    enable: true
    #[必修修改] 1708设备接入的端口
    port: 21078
    #[可选] 设备鉴权的密码
    password: admin123
#zlm 默认服务器配置
media:
    # [必须修改] zlm服务器唯一id,用于触发hook时区别是哪台服务器,general.mediaServerId