diff --git a/pom.xml b/pom.xml index 0536bb4..3c2cd6e 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,17 @@ jump3r 1.0.5 + + + com.google.protobuf + protobuf-java + 3.3.0 + + + org.springframework + spring-web + 5.3.25 + diff --git a/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java b/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java index 83c7091..951d1a7 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java @@ -1,5 +1,7 @@ package cn.org.hentai.jtt1078.app; +import cn.org.hentai.jtt1078.app.websocket.MyException; +import cn.org.hentai.jtt1078.app.websocket.WebsocketClient; import cn.org.hentai.jtt1078.http.GeneralResponseWriter; import cn.org.hentai.jtt1078.http.NettyHttpServerHandler; import cn.org.hentai.jtt1078.publisher.PublishManager; @@ -19,19 +21,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.misc.Signal; import sun.misc.SignalHandler; -import io.netty.handler.timeout.IdleStateHandler; + import java.net.InetAddress; -import java.util.concurrent.TimeUnit; +import java.net.URISyntaxException; /** * Created by matrixy on 2019/4/9. */ -public class VideoServerApp -{ +public class VideoServerApp { private static Logger logger = LoggerFactory.getLogger(VideoServerApp.class); - public static void main(String[] args) throws Exception - { + public static void main(String[] args) throws Exception { Configs.init("/app.properties"); PublishManager.init(); SessionManager.init(); @@ -39,29 +39,37 @@ public class VideoServerApp VideoServer videoServer = new VideoServer(); HttpServer httpServer = new HttpServer(); - Signal.handle(new Signal("TERM"), new SignalHandler() - { + Signal.handle(new Signal("TERM"), new SignalHandler() { @Override - public void handle(Signal signal) - { + public void handle(Signal signal) { videoServer.shutdown(); httpServer.shutdown(); } }); + try (WebsocketClient websocketClient = new WebsocketClient("ws://localhost:19800/ws", 15)) { + // 连接 + websocketClient.connect(); + // 发送消息 + websocketClient.write("xxxxxxxxxxxxxxxxx"); + // 获取结果 + String result = websocketClient.receiveResult(); + logger.info("接收到结果[{}]", result); + } catch (URISyntaxException | MyException e) { + logger.error("发生异常,原因:{}", e.getMessage(), e); + } + videoServer.start(); httpServer.start(); } - static class VideoServer - { + static class VideoServer { private static ServerBootstrap serverBootstrap; private static EventLoopGroup bossGroup; private static EventLoopGroup workerGroup; - private static void start() throws Exception - { + private static void start() throws Exception { serverBootstrap = new ServerBootstrap(); serverBootstrap.option(ChannelOption.SO_BACKLOG, Configs.getInt("server.backlog", 102400)); bossGroup = new NioEventLoopGroup(Configs.getInt("server.worker-count", Runtime.getRuntime().availableProcessors())); @@ -86,39 +94,31 @@ public class VideoServerApp ch.closeFuture(); } - private static void shutdown() - { - try - { + private static void shutdown() { + try { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); - } - catch(Exception e) - { + } catch (Exception e) { e.printStackTrace(); } } } - static class HttpServer - { + static class HttpServer { private static ServerBootstrap serverBootstrap; private static EventLoopGroup bossGroup; private static EventLoopGroup workerGroup; - private static void start() throws Exception - { + private static void start() throws Exception { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() - { + .childHandler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception - { + public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new GeneralResponseWriter(), new HttpResponseEncoder(), @@ -129,35 +129,27 @@ public class VideoServerApp } }).option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true); - try - { + try { int port = Configs.getInt("server.http.port", 3333); ChannelFuture f = bootstrap.bind(InetAddress.getByName("0.0.0.0"), port).sync(); logger.info("HTTP Server started at: {}", port); f.channel().closeFuture().sync(); - } - catch (InterruptedException e) - { + } catch (InterruptedException e) { logger.error("http server error", e); - } - finally - { + } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } - private static void shutdown() - { - try - { + private static void shutdown() { + try { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); - } - catch(Exception e) - { + } catch (Exception e) { e.printStackTrace(); } } } + } diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java new file mode 100644 index 0000000..bf0a85a --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java @@ -0,0 +1,123 @@ +package cn.org.hentai.jtt1078.app.websocket; + +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractWebsocketClient implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(AbstractWebsocketClient.class); + + /** + * 接收响应的超时时间(秒) + */ + private final int connectionTimeout; + + /** + * 任务上下文 + */ + protected WebsocketContext websocketContext; + + public AbstractWebsocketClient(int connectionTimeout, String serviceType) { + this.connectionTimeout = connectionTimeout; + this.websocketContext = new WebsocketContext(new CountDownLatch(1)); + } + + /** + * 发送消息.
+ * + * @param message 发送文本 + * @return: + */ + public void write(String message) throws MyException { + Channel channel = getChannel(); + if (channel != null) { + channel.writeAndFlush(new TextWebSocketFrame(message)); + return; + } + throw new MyException("连接已经关闭"); + } + + /** + * 连接并发送消息.
+ * + * @return: + */ + public void connect() throws MyException { + try { + doOpen(); + doConnect(); + } catch (Exception e) { + throw new MyException("连接没有成功打开,原因是:{}" + e.getMessage(), e); + } + } + + /** + * 接收消息.
+ * + * @return: {@link java.lang.String} + */ + public String receiveResult() throws MyException { + this.receive(this.websocketContext.getCountDownLatch()); + if (StringUtils.isEmpty(this.websocketContext.getResult())) { + throw new MyException("未获取到任务结果信息"); + } + return this.websocketContext.getResult(); + } + + /** + * 接收消息封装.
+ * + * @param countDownLatch 计数器 + * @return: + */ + private void receive(CountDownLatch countDownLatch) throws MyException { + boolean waitFlag = false; + try { + waitFlag = countDownLatch.await(connectionTimeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.info("此连接未接收到响应信息"); + Thread.currentThread().interrupt(); + } + if (!waitFlag) { + log.error("Timeout({}}s) when receiving response message", connectionTimeout); + throw new MyException("此连接未接收到响应信息"); + } + } + + /** + * 初始化连接.
+ * + * @return: + */ + protected abstract void doOpen(); + + /** + * 建立连接.
+ * + * @return: + */ + protected abstract void doConnect() throws MyException; + + /** + * 获取本次连接channel.
+ * + * @return: {@link Channel} + */ + protected abstract Channel getChannel(); + + /** + * 关闭连接.
+ * + * @return: + * @exception: + */ + @Override + public abstract void close(); +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/MyException.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/MyException.java new file mode 100644 index 0000000..3dd3ba8 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/MyException.java @@ -0,0 +1,26 @@ +package cn.org.hentai.jtt1078.app.websocket; + +public class MyException extends Exception { + + + public MyException() { + super(); + } + + public MyException(String message) { + super(message); + + } + + /** + * 用指定的详细信息和原因构造一个新的异常.
+ * + * @param message + * @param cause + * @return: + */ + public MyException(String message, Throwable cause) { + super(message, cause); + } + +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketChannelInitializer.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketChannelInitializer.java new file mode 100644 index 0000000..d3e69ee --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketChannelInitializer.java @@ -0,0 +1,26 @@ +package cn.org.hentai.jtt1078.app.websocket; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; + +public class WebsocketChannelInitializer extends ChannelInitializer { + + private final WebsocketClientHandler handler; + + public WebsocketChannelInitializer(WebsocketClientHandler handler) { + this.handler = handler; + } + + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new HttpClientCodec()); + p.addLast(new HttpObjectAggregator(8192)); + p.addLast(WebSocketClientCompressionHandler.INSTANCE); + p.addLast(handler); + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java new file mode 100644 index 0000000..b2c4e73 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java @@ -0,0 +1,99 @@ +package cn.org.hentai.jtt1078.app.websocket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URISyntaxException; + +public class WebsocketClient extends AbstractWebsocketClient { + + private static final Logger log = LoggerFactory.getLogger(WebsocketClient.class); + + private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + + private final URI uri; + + private final int port; + + private Bootstrap bootstrap; + + private WebsocketClientHandler handler; + + private Channel channel; + + public WebsocketClient(String url, int connectionTimeout) throws URISyntaxException, MyException { + super(connectionTimeout, "ws"); + this.uri = new URI(url); + this.port = getPort(); + } + + /** + * Extract the specified port + * + * @return the specified port or the default port for the specific scheme + */ + private int getPort() throws MyException { + int port = uri.getPort(); + if (port == -1) { + String scheme = uri.getScheme(); + if ("wss".equals(scheme)) { + return 443; + } else if ("ws".equals(scheme)) { + return 19800; + } else { + throw new MyException("unknown scheme: " + scheme); + } + } + return port; + } + + @Override + protected void doOpen() { + // websocket客户端握手实现的基类 + WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()); + // 业务处理类 + handler = new WebsocketClientHandler(webSocketClientHandshaker, this.websocketContext); + // client端,引导client channel启动 + bootstrap = new Bootstrap(); + // 添加管道 绑定端口 添加作用域等 + bootstrap.group(NIO_GROUP).channel(NioSocketChannel.class).handler(new WebsocketChannelInitializer(handler)); + } + + @Override + protected void doConnect() { + try { + // 启动连接 + channel = bootstrap.connect(uri.getHost(), port).sync().channel(); + // 等待握手响应 + handler.handshakeFuture().sync(); + } catch (InterruptedException e) { + log.error("websocket连接发生异常", e); + Thread.currentThread().interrupt(); + } + } + + @Override + protected Channel getChannel() { + return channel; + } + + @Override + public void close() { + if (channel != null) { + channel.close(); + } + } + + public boolean isOpen() { + return channel.isOpen(); + } +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketContext.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketContext.java new file mode 100644 index 0000000..3144497 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketContext.java @@ -0,0 +1,37 @@ +package cn.org.hentai.jtt1078.app.websocket; + +import java.util.concurrent.CountDownLatch; + +public class WebsocketContext { + + /** + * 计数器(用于监听是否返回结果) + */ + private CountDownLatch countDownLatch; + + /** + * 最终结果 + */ + private String result; + + public WebsocketContext(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/controller/BusinessController.java b/src/main/java/cn/org/hentai/jtt1078/controller/BusinessController.java new file mode 100644 index 0000000..cf15597 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/controller/BusinessController.java @@ -0,0 +1,28 @@ +package cn.org.hentai.jtt1078.controller; + +import cn.org.hentai.jtt1078.publisher.PublishManager; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.*; + +@RestController +@RequestMapping +public class BusinessController { + + /** + * 获取当前推送视频车载机的clientID + * + * @return + */ + @GetMapping("device/currentPush") + public Map getCurPushDevice() { + Map data = new HashMap<>(); + data.put("status", "OK"); + data.put("code", "200"); + data.put("data", PublishManager.getInstance().getCurPushDevice()); + return data; + } + +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java index a977955..90f8499 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java @@ -62,6 +62,14 @@ public final class PublishManager { if (chl != null) chl.close(); } + public String getCurPushDevice() { + if (channels.size() == 0) { + return ""; + } + Channel chl = channels.get(0); + return chl.tag; + } + public void unsubscribe(String tag, long watcherId) { Channel chl = channels.get(tag); if (chl != null) chl.unsubscribe(watcherId); diff --git a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java index eec1946..bbde0d1 100644 --- a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java +++ b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java @@ -4,6 +4,8 @@ import cn.org.hentai.jtt1078.codec.MP3Encoder; import cn.org.hentai.jtt1078.flv.AudioTag; import cn.org.hentai.jtt1078.flv.FlvAudioTagEncoder; import cn.org.hentai.jtt1078.flv.FlvEncoder; +import cn.org.hentai.jtt1078.publisher.Channel; +import cn.org.hentai.jtt1078.publisher.PublishManager; import cn.org.hentai.jtt1078.util.*; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -46,6 +48,9 @@ public class RTMPPublisher extends Thread { while ((len = stderr.read(buff)) > -1) { if (debugMode) System.out.print(new String(buff, 0, len)); } + + // 若ffmpeg命令执行失败, 视为推流失败, 将通道从缓存中删除 + PublishManager.getInstance().close(tag); logger.info("Process FFMPEG exited..."); } catch (Exception ex) { logger.error("publish failed", ex); diff --git a/src/main/java/cn/org/hentai/jtt1078/test/ChannelTest.java b/src/main/java/cn/org/hentai/jtt1078/test/ChannelTest.java index e3233d3..c30eda8 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/ChannelTest.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/ChannelTest.java @@ -11,24 +11,19 @@ import java.nio.channels.ByteChannel; /** * Created by matrixy on 2020/1/9. */ -public class ChannelTest implements ByteChannel -{ +public class ChannelTest implements ByteChannel { byte[] temp = new byte[4]; ByteHolder buffer = new ByteHolder(1024); // 读出,存入dst @Override - public int read(ByteBuffer dst) throws IOException - { + public int read(ByteBuffer dst) throws IOException { dst.flip(); int len = Math.min(4, buffer.size()); - if (dst.remaining() > len) - { + if (dst.remaining() > len) { buffer.sliceInto(temp, len); dst.put(temp, 0, len); - } - else - { + } else { // 丢掉??? } dst.flip(); @@ -37,8 +32,7 @@ public class ChannelTest implements ByteChannel // 从src读出,写入进来 @Override - public int write(ByteBuffer src) throws IOException - { + public int write(ByteBuffer src) throws IOException { int len = -1; // src.flip(); len = Math.min(4, src.limit()); @@ -50,30 +44,26 @@ public class ChannelTest implements ByteChannel } @Override - public boolean isOpen() - { + public boolean isOpen() { return true; } @Override - public void close() throws IOException - { + public void close() throws IOException { } - public byte[] array() - { + public byte[] array() { return buffer.array(); } - public static void main(String[] args) throws Exception - { + public static void main(String[] args) throws Exception { ChannelTest chl = new ChannelTest(); ByteBuffer buffer = ByteBuffer.allocate(4); java.nio.ByteBuffer xx; System.out.println(buffer.getClass().getName()); for (int i = 0; i < 4096; i++) - buffer.put((byte)'f'); + buffer.put((byte) 'f'); /* buffer.putLong(0x1122334455667788L); buffer.flip(); @@ -85,26 +75,22 @@ public class ChannelTest implements ByteChannel */ } - static final class ByteBufferWrapper - { + static final class ByteBufferWrapper { boolean writeMode; ByteBuffer buffer; - private ByteBufferWrapper(int size) - { + private ByteBufferWrapper(int size) { this.buffer = ByteBuffer.allocate(size); } // 控制写入,代理过来 - public void write() - { + public void write() { } // 写出就无所谓了 - public static ByteBufferWrapper create(int size) - { + public static ByteBufferWrapper create(int size) { return new ByteBufferWrapper(size); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/test/Command.java b/src/main/java/cn/org/hentai/jtt1078/test/Command.java new file mode 100644 index 0000000..cb73fc3 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/test/Command.java @@ -0,0 +1,259 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: Command.proto + +package cn.org.hentai.jtt1078.test; + +public final class Command { + private Command() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + /** + *
+   **
+   * 指令类型
+   * 
+ * + * Protobuf enum {@code CommandType} + */ + public enum CommandType + implements com.google.protobuf.ProtocolMessageEnum { + /** + *
+     **
+     * 验证
+     * 
+ * + * AUTH = 1; + */ + AUTH(1), + /** + *
+     **
+     * ping
+     * 
+ * + * PING = 2; + */ + PING(2), + /** + *
+     **
+     * pong
+     * 
+ * + * PONG = 3; + */ + PONG(3), + /** + *
+     **
+     * 上传数据
+     * 
+ * + * UPLOAD_DATA = 4; + */ + UPLOAD_DATA(4), + /** + *
+     **
+     * 推送数据
+     * 
+ * + * PUSH_DATA = 5; + */ + PUSH_DATA(5), + /** + *
+     **
+     * 验证返回
+     * 
+ * + * AUTH_BACK = 11; + */ + AUTH_BACK(11), + /** + * UPLOAD_DATA_BACK = 14; + */ + UPLOAD_DATA_BACK(14), + /** + * PUSH_DATA_BACK = 15; + */ + PUSH_DATA_BACK(15), + ; + + /** + *
+     **
+     * 验证
+     * 
+ * + * AUTH = 1; + */ + public static final int AUTH_VALUE = 1; + /** + *
+     **
+     * ping
+     * 
+ * + * PING = 2; + */ + public static final int PING_VALUE = 2; + /** + *
+     **
+     * pong
+     * 
+ * + * PONG = 3; + */ + public static final int PONG_VALUE = 3; + /** + *
+     **
+     * 上传数据
+     * 
+ * + * UPLOAD_DATA = 4; + */ + public static final int UPLOAD_DATA_VALUE = 4; + /** + *
+     **
+     * 推送数据
+     * 
+ * + * PUSH_DATA = 5; + */ + public static final int PUSH_DATA_VALUE = 5; + /** + *
+     **
+     * 验证返回
+     * 
+ * + * AUTH_BACK = 11; + */ + public static final int AUTH_BACK_VALUE = 11; + /** + * UPLOAD_DATA_BACK = 14; + */ + public static final int UPLOAD_DATA_BACK_VALUE = 14; + /** + * PUSH_DATA_BACK = 15; + */ + public static final int PUSH_DATA_BACK_VALUE = 15; + + + public final int getNumber() { + return value; + } + + /** + * @deprecated Use {@link #forNumber(int)} instead. + */ + @Deprecated + public static CommandType valueOf(int value) { + return forNumber(value); + } + + public static CommandType forNumber(int value) { + switch (value) { + case 1: return AUTH; + case 2: return PING; + case 3: return PONG; + case 4: return UPLOAD_DATA; + case 5: return PUSH_DATA; + case 11: return AUTH_BACK; + case 14: return UPLOAD_DATA_BACK; + case 15: return PUSH_DATA_BACK; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static final com.google.protobuf.Internal.EnumLiteMap< + CommandType> internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public CommandType findValueByNumber(int number) { + return CommandType.forNumber(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(ordinal()); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return Command.getDescriptor().getEnumTypes().get(0); + } + + private static final CommandType[] VALUES = values(); + + public static CommandType valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int value; + + private CommandType(int value) { + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:CommandType) + } + + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + String[] descriptorData = { + "\n\rCommand.proto\032\rCommand.proto*\204\001\n\013Comma" + + "ndType\022\010\n\004AUTH\020\001\022\010\n\004PING\020\002\022\010\n\004PONG\020\003\022\017\n\013" + + "UPLOAD_DATA\020\004\022\r\n\tPUSH_DATA\020\005\022\r\n\tAUTH_BAC" + + "K\020\013\022\024\n\020UPLOAD_DATA_BACK\020\016\022\022\n\016PUSH_DATA_B" + + "ACK\020\017B$\n\031com.netty.common.protobufB\007Comm" + + "and" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + Command.getDescriptor(), + }, assigner); + Command.getDescriptor(); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/src/main/java/cn/org/hentai/jtt1078/test/Command.proto b/src/main/java/cn/org/hentai/jtt1078/test/Command.proto new file mode 100644 index 0000000..ab6918f --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/test/Command.proto @@ -0,0 +1,39 @@ +syntax = "proto2"; + +option java_package="cn.myzf.common.protobuf"; +option java_outer_classname = "Command"; +import "cn/myzf/common/protobuf/Command.proto"; +/** + * 指令类型 + */ +enum CommandType { + /** + * 验证 + */ + AUTH = 1; + /** + * ping + */ + PING = 2; + /** + * pong + */ + PONG = 3; + /** + * 上传数据 + */ + UPLOAD_DATA = 4; + /** + * 推送数据 + */ + PUSH_DATA = 5; + + /** + * 验证返回 + */ + AUTH_BACK = 11; + + UPLOAD_DATA_BACK = 14; + + PUSH_DATA_BACK = 15; +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/test/Connect808Test.java b/src/main/java/cn/org/hentai/jtt1078/test/Connect808Test.java index 8610992..743b944 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/Connect808Test.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/Connect808Test.java @@ -1,63 +1,109 @@ package cn.org.hentai.jtt1078.test; +import cn.org.hentai.jtt1078.app.VideoServerApp; +import cn.org.hentai.jtt1078.util.Packet; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.timeout.IdleStateHandler; +import java.nio.charset.StandardCharsets; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.TimeUnit; /** * Created by matrixy on 2019/12/18. */ public class Connect808Test { - private Bootstrap b = new Bootstrap(); - private EventLoopGroup group; - - public Connect808Test() { - group = new NioEventLoopGroup(); - b.group(group) - .channel(NioSocketChannel.class) - .option(ChannelOption.TCP_NODELAY, true) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel channel) throws Exception { + private final static String HOST = "127.0.0.1"; + private final static int PORT = 19999; + // private final static int PORT = 7611; + private final static int READER_IDLE_TIME_SECONDS = 0;//读操作空闲20秒 + private final static int WRITER_IDLE_TIME_SECONDS = 5;//写操作空闲20秒 + private final static int ALL_IDLE_TIME_SECONDS = 0;//读写全部空闲40秒 - } - }); - } + public static void main(String[] args) { + String host = "127.0.0.1"; +// int port = 7611; + int port = 19999; - public void connect(String host, int port) { - System.out.println("11111111111 connect " + host + " " + Thread.currentThread()); - b.connect(host, port).addListener(new ChannelFutureListener() { + new Thread(new Runnable() { @Override - public void operationComplete(ChannelFuture future) throws Exception { - /* - * 这里就不是主线程了,这里是 netty 线程中执行 - */ - if (future.isSuccess()) { - System.out.println("2222222222222 connect success " + host + " " + Thread.currentThread()); - } else { - System.out.println("333333333333333 connect failed " + host + " " + Thread.currentThread()); - // 连接不成功,5秒后重新连接 - future.channel().eventLoop().schedule(new Runnable() { - @Override - public void run() { - System.out.println("4444444444444 reconnect " + host + " " + Thread.currentThread()); - connect(host, port); - } - }, 5, TimeUnit.SECONDS); + public void run() { + try { + new Connect808Test().doConnect(new Bootstrap(), new NioEventLoopGroup()); + } catch (Exception e) { + e.printStackTrace(); } } - }); + }).start(); } - public void stop() { - if (group != null) { - group.shutdownGracefully(); - group = null; + /** + * netty client 连接,连接失败5秒后重试连接 + */ + public Bootstrap doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) { + try { + if (bootstrap != null) { + bootstrap.group(eventLoopGroup); + bootstrap.channel(NioSocketChannel.class); + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); + bootstrap.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast("idleStateHandler", new IdleStateHandler(READER_IDLE_TIME_SECONDS + , WRITER_IDLE_TIME_SECONDS, ALL_IDLE_TIME_SECONDS, TimeUnit.SECONDS)); + + p.addLast(new ProtobufVarint32FrameDecoder()); + p.addLast(new ProtobufDecoder(Message.MessageBase.getDefaultInstance())); + + p.addLast(new ProtobufVarint32LengthFieldPrepender()); + p.addLast(new ProtobufEncoder()); + + p.addLast("clientHandler", new LogicClientHandler()); +// p.addLast("idleTimeoutHandler", new HeartHandler(NettyClient.this)); + } + }); + + bootstrap.remoteAddress(HOST, PORT); + ChannelFuture f = bootstrap.connect().addListener((ChannelFuture futureListener) -> { + final EventLoop eventLoop = futureListener.channel().eventLoop(); + if (!futureListener.isSuccess()) { +// log.warn("连接服务器失败,5s后重新尝试连接!"); + futureListener.channel().eventLoop().schedule(() -> doConnect(new Bootstrap(), eventLoop), 5, TimeUnit.SECONDS); + } + }); + f.channel().closeFuture().sync(); + } + } catch (InterruptedException e) { + e.printStackTrace(); } + return bootstrap; } + private void startConsoleThread(Channel channel) { + new Thread(() -> { + new Timer().schedule(new TimerTask() { + @Override + public void run() { + System.out.print("输入消息发送至服务端 : "); + String hex = "7e0100002e0123456789017fff001f00730000000034000000000000000000000042534a2d47462d30367465737431323301b2e241383838383838157e"; + Packet packet = Packet.create(hex.getBytes()); + byte[] bytes = "|123,0100,1000;test|".getBytes(StandardCharsets.UTF_8); + String data = ByteBufUtil.hexDump(bytes); + System.out.println(data); + channel.writeAndFlush(bytes); + } + }, 1000, 10000); + }).start(); + } } diff --git a/src/main/java/cn/org/hentai/jtt1078/test/LogicClientHandler.java b/src/main/java/cn/org/hentai/jtt1078/test/LogicClientHandler.java new file mode 100644 index 0000000..1d58fc8 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/test/LogicClientHandler.java @@ -0,0 +1,36 @@ +package cn.org.hentai.jtt1078.test; + + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LogicClientHandler extends SimpleChannelInboundHandler { + public Logger log = LoggerFactory.getLogger(this.getClass()); + + private final static String CLIENTID = "test11111"; + + // 连接成功后,向server发送消息 + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Message.MessageBase.Builder authMsg = Message.MessageBase.newBuilder(); + authMsg.setClientId(CLIENTID); + authMsg.setCmd(Command.CommandType.AUTH); +// authMsg.setCmd(null); + authMsg.setData("This is auth data44444444444444444"); + + ctx.writeAndFlush(authMsg.build()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.debug("连接断开"); + } + + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { + + } +} diff --git a/src/main/java/cn/org/hentai/jtt1078/test/Message.java b/src/main/java/cn/org/hentai/jtt1078/test/Message.java new file mode 100644 index 0000000..7a1f3a1 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/test/Message.java @@ -0,0 +1,879 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: Message.proto + +package cn.org.hentai.jtt1078.test; + +public final class Message { + private Message() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } + + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + public interface MessageBaseOrBuilder extends + // @@protoc_insertion_point(interface_extends:MessageBase) + com.google.protobuf.MessageOrBuilder { + + /** + * required string clientId = 1; + */ + boolean hasClientId(); + /** + * required string clientId = 1; + */ + String getClientId(); + /** + * required string clientId = 1; + */ + com.google.protobuf.ByteString + getClientIdBytes(); + + /** + * required .CommandType cmd = 2; + */ + boolean hasCmd(); + /** + * required .CommandType cmd = 2; + */ + Command.CommandType getCmd(); + + /** + * optional string data = 3; + */ + boolean hasData(); + /** + * optional string data = 3; + */ + String getData(); + /** + * optional string data = 3; + */ + com.google.protobuf.ByteString + getDataBytes(); + } + /** + * Protobuf type {@code MessageBase} + */ + public static final class MessageBase extends + com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:MessageBase) + MessageBaseOrBuilder { + // Use MessageBase.newBuilder() to construct. + private MessageBase(com.google.protobuf.GeneratedMessageV3.Builder builder) { + super(builder); + } + private MessageBase() { + clientId_ = ""; + cmd_ = 1; + data_ = ""; + } + + @Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private MessageBase( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + this(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + com.google.protobuf.ByteString bs = input.readBytes(); + bitField0_ |= 0x00000001; + clientId_ = bs; + break; + } + case 16: { + int rawValue = input.readEnum(); + Command.CommandType value = Command.CommandType.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(2, rawValue); + } else { + bitField0_ |= 0x00000002; + cmd_ = rawValue; + } + break; + } + case 26: { + com.google.protobuf.ByteString bs = input.readBytes(); + bitField0_ |= 0x00000004; + data_ = bs; + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return Message.internal_static_MessageBase_descriptor; + } + + protected FieldAccessorTable + internalGetFieldAccessorTable() { + return Message.internal_static_MessageBase_fieldAccessorTable + .ensureFieldAccessorsInitialized( + Message.MessageBase.class, Message.MessageBase.Builder.class); + } + + private int bitField0_; + public static final int CLIENTID_FIELD_NUMBER = 1; + private volatile Object clientId_; + /** + * required string clientId = 1; + */ + public boolean hasClientId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string clientId = 1; + */ + public String getClientId() { + Object ref = clientId_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + clientId_ = s; + } + return s; + } + } + /** + * required string clientId = 1; + */ + public com.google.protobuf.ByteString + getClientIdBytes() { + Object ref = clientId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (String) ref); + clientId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + public static final int CMD_FIELD_NUMBER = 2; + private int cmd_; + /** + * required .CommandType cmd = 2; + */ + public boolean hasCmd() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .CommandType cmd = 2; + */ + public Command.CommandType getCmd() { + Command.CommandType result = Command.CommandType.valueOf(cmd_); + return result == null ? Command.CommandType.AUTH : result; + } + + public static final int DATA_FIELD_NUMBER = 3; + private volatile Object data_; + /** + * optional string data = 3; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional string data = 3; + */ + public String getData() { + Object ref = data_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + data_ = s; + } + return s; + } + } + /** + * optional string data = 3; + */ + public com.google.protobuf.ByteString + getDataBytes() { + Object ref = data_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (String) ref); + data_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + if (!hasClientId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasCmd()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (((bitField0_ & 0x00000001) == 0x00000001)) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 1, clientId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeEnum(2, cmd_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + com.google.protobuf.GeneratedMessageV3.writeString(output, 3, data_); + } + unknownFields.writeTo(output); + } + + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, clientId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(2, cmd_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, data_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof Message.MessageBase)) { + return super.equals(obj); + } + Message.MessageBase other = (Message.MessageBase) obj; + + boolean result = true; + result = result && (hasClientId() == other.hasClientId()); + if (hasClientId()) { + result = result && getClientId() + .equals(other.getClientId()); + } + result = result && (hasCmd() == other.hasCmd()); + if (hasCmd()) { + result = result && cmd_ == other.cmd_; + } + result = result && (hasData() == other.hasData()); + if (hasData()) { + result = result && getData() + .equals(other.getData()); + } + result = result && unknownFields.equals(other.unknownFields); + return result; + } + + @Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasClientId()) { + hash = (37 * hash) + CLIENTID_FIELD_NUMBER; + hash = (53 * hash) + getClientId().hashCode(); + } + if (hasCmd()) { + hash = (37 * hash) + CMD_FIELD_NUMBER; + hash = (53 * hash) + cmd_; + } + if (hasData()) { + hash = (37 * hash) + DATA_FIELD_NUMBER; + hash = (53 * hash) + getData().hashCode(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static Message.MessageBase parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static Message.MessageBase parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static Message.MessageBase parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static Message.MessageBase parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static Message.MessageBase parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static Message.MessageBase parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static Message.MessageBase parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static Message.MessageBase parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static Message.MessageBase parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static Message.MessageBase parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(Message.MessageBase prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @Override + protected Builder newBuilderForType( + BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code MessageBase} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:MessageBase) + Message.MessageBaseOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return Message.internal_static_MessageBase_descriptor; + } + + protected FieldAccessorTable + internalGetFieldAccessorTable() { + return Message.internal_static_MessageBase_fieldAccessorTable + .ensureFieldAccessorsInitialized( + Message.MessageBase.class, Message.MessageBase.Builder.class); + } + + // Construct using Message.MessageBase.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + public Builder clear() { + super.clear(); + clientId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + cmd_ = 1; + bitField0_ = (bitField0_ & ~0x00000002); + data_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return Message.internal_static_MessageBase_descriptor; + } + + public Message.MessageBase getDefaultInstanceForType() { + return Message.MessageBase.getDefaultInstance(); + } + + public Message.MessageBase build() { + Message.MessageBase result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public Message.MessageBase buildPartial() { + Message.MessageBase result = new Message.MessageBase(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.clientId_ = clientId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.cmd_ = cmd_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.data_ = data_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder clone() { + return (Builder) super.clone(); + } + public Builder setField( + com.google.protobuf.Descriptors.FieldDescriptor field, + Object value) { + return (Builder) super.setField(field, value); + } + public Builder clearField( + com.google.protobuf.Descriptors.FieldDescriptor field) { + return (Builder) super.clearField(field); + } + public Builder clearOneof( + com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return (Builder) super.clearOneof(oneof); + } + public Builder setRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + int index, Object value) { + return (Builder) super.setRepeatedField(field, index, value); + } + public Builder addRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + Object value) { + return (Builder) super.addRepeatedField(field, value); + } + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof Message.MessageBase) { + return mergeFrom((Message.MessageBase)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(Message.MessageBase other) { + if (other == Message.MessageBase.getDefaultInstance()) return this; + if (other.hasClientId()) { + bitField0_ |= 0x00000001; + clientId_ = other.clientId_; + onChanged(); + } + if (other.hasCmd()) { + setCmd(other.getCmd()); + } + if (other.hasData()) { + bitField0_ |= 0x00000004; + data_ = other.data_; + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + public final boolean isInitialized() { + if (!hasClientId()) { + return false; + } + if (!hasCmd()) { + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Message.MessageBase parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (Message.MessageBase) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private Object clientId_ = ""; + /** + * required string clientId = 1; + */ + public boolean hasClientId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string clientId = 1; + */ + public String getClientId() { + Object ref = clientId_; + if (!(ref instanceof String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + clientId_ = s; + } + return s; + } else { + return (String) ref; + } + } + /** + * required string clientId = 1; + */ + public com.google.protobuf.ByteString + getClientIdBytes() { + Object ref = clientId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (String) ref); + clientId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string clientId = 1; + */ + public Builder setClientId( + String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + clientId_ = value; + onChanged(); + return this; + } + /** + * required string clientId = 1; + */ + public Builder clearClientId() { + bitField0_ = (bitField0_ & ~0x00000001); + clientId_ = getDefaultInstance().getClientId(); + onChanged(); + return this; + } + /** + * required string clientId = 1; + */ + public Builder setClientIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + clientId_ = value; + onChanged(); + return this; + } + + private int cmd_ = 1; + /** + * required .CommandType cmd = 2; + */ + public boolean hasCmd() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .CommandType cmd = 2; + */ + public Command.CommandType getCmd() { + Command.CommandType result = Command.CommandType.valueOf(cmd_); + return result == null ? Command.CommandType.AUTH : result; + } + /** + * required .CommandType cmd = 2; + */ + public Builder setCmd(Command.CommandType value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + cmd_ = value.getNumber(); + onChanged(); + return this; + } + /** + * required .CommandType cmd = 2; + */ + public Builder clearCmd() { + bitField0_ = (bitField0_ & ~0x00000002); + cmd_ = 1; + onChanged(); + return this; + } + + private Object data_ = ""; + /** + * optional string data = 3; + */ + public boolean hasData() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional string data = 3; + */ + public String getData() { + Object ref = data_; + if (!(ref instanceof String)) { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + data_ = s; + } + return s; + } else { + return (String) ref; + } + } + /** + * optional string data = 3; + */ + public com.google.protobuf.ByteString + getDataBytes() { + Object ref = data_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (String) ref); + data_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string data = 3; + */ + public Builder setData( + String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + data_ = value; + onChanged(); + return this; + } + /** + * optional string data = 3; + */ + public Builder clearData() { + bitField0_ = (bitField0_ & ~0x00000004); + data_ = getDefaultInstance().getData(); + onChanged(); + return this; + } + /** + * optional string data = 3; + */ + public Builder setDataBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + data_ = value; + onChanged(); + return this; + } + public final Builder setUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + public final Builder mergeUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:MessageBase) + } + + // @@protoc_insertion_point(class_scope:MessageBase) + private static final Message.MessageBase DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new Message.MessageBase(); + } + + public static Message.MessageBase getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @Deprecated public static final com.google.protobuf.Parser + PARSER = new com.google.protobuf.AbstractParser() { + public MessageBase parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new MessageBase(input, extensionRegistry); + } + }; + + public static com.google.protobuf.Parser parser() { + return PARSER; + } + + @Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + public Message.MessageBase getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + + private static final com.google.protobuf.Descriptors.Descriptor + internal_static_MessageBase_descriptor; + private static final + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_MessageBase_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + String[] descriptorData = { + "\n\rMessage.proto\032\rCommand.proto\"H\n\013Messag" + + "eBase\022\020\n\010clientId\030\001 \002(\t\022\031\n\003cmd\030\002 \002(\0162\014.C" + + "ommandType\022\014\n\004data\030\003 \001(\tB$\n\031com.netty.co" + + "mmon.protobufB\007Message" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + Command.getDescriptor(), + }, assigner); + internal_static_MessageBase_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_MessageBase_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_MessageBase_descriptor, + new String[] { "ClientId", "Cmd", "Data", }); + Command.getDescriptor(); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/src/main/java/cn/org/hentai/jtt1078/test/Message.proto b/src/main/java/cn/org/hentai/jtt1078/test/Message.proto new file mode 100644 index 0000000..3bc40ca --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/test/Message.proto @@ -0,0 +1,11 @@ +syntax = "proto2"; + +option java_package="cn.myzf.common.protobuf"; +option java_outer_classname = "Message"; +import "cn/myzf/common/protobuf/Command.proto"; + +message MessageBase { + required string clientId = 1; + required CommandType cmd = 2; + optional string data = 3; +} \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/test/NettyServerTest.java b/src/main/java/cn/org/hentai/jtt1078/test/NettyServerTest.java new file mode 100644 index 0000000..5c66090 --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/test/NettyServerTest.java @@ -0,0 +1,59 @@ +package cn.org.hentai.jtt1078.test; + +import cn.org.hentai.jtt1078.util.Packet; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.nio.charset.StandardCharsets; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +/** + * Created by matrixy on 2019/12/18. + */ +public class NettyServerTest { + public static void main(String[] args) { + // Server配置 + //boss loop + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + //worker loop + EventLoopGroup workerGroup = new NioEventLoopGroup(); +// final CheerUpServerHandler serverHandler = new CheerUpServerHandler(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + // tcp/ip协议listen函数中的backlog参数,等待连接池的大小 + .option(ChannelOption.SO_BACKLOG, 100) + //日志处理器 + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + //初始化channel,添加handler + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + //日志处理器 + p.addLast(new LoggingHandler(LogLevel.INFO)); +// p.addLast(serverHandler); + } + }); + + // 启动服务器 + ChannelFuture f = b.bind(18348).sync(); + + // 等待channel关闭 + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/resources/app.properties b/src/main/resources/app.properties index 5154c42..a559c15 100644 --- a/src/main/resources/app.properties +++ b/src/main/resources/app.properties @@ -2,6 +2,9 @@ server.port = 1078 server.http.port = 3333 server.backlog = 1024 +jtt808server.host = 127.0.0.1 +jtt808server.port = 7611 + # ffmpeg可执行文件路径,可以留空 #ffmpeg.path = C:/Users/Administrator/Desktop/ffmpeg-6.0-essentials_build/bin/ffmpeg.exe ffmpeg.path = C:/Program Files/ffmpeg/bin/ffmpeg.exe