1. 添加websocket客户端

2. 添加接口
master
liuqingkun 3 years ago
parent 7155af4bbb
commit 88d966b1d7
  1. 11
      pom.xml
  2. 80
      src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java
  3. 123
      src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java
  4. 26
      src/main/java/cn/org/hentai/jtt1078/app/websocket/MyException.java
  5. 26
      src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketChannelInitializer.java
  6. 99
      src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java
  7. 37
      src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketContext.java
  8. 28
      src/main/java/cn/org/hentai/jtt1078/controller/BusinessController.java
  9. 8
      src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java
  10. 5
      src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java
  11. 42
      src/main/java/cn/org/hentai/jtt1078/test/ChannelTest.java
  12. 259
      src/main/java/cn/org/hentai/jtt1078/test/Command.java
  13. 39
      src/main/java/cn/org/hentai/jtt1078/test/Command.proto
  14. 122
      src/main/java/cn/org/hentai/jtt1078/test/Connect808Test.java
  15. 36
      src/main/java/cn/org/hentai/jtt1078/test/LogicClientHandler.java
  16. 879
      src/main/java/cn/org/hentai/jtt1078/test/Message.java
  17. 11
      src/main/java/cn/org/hentai/jtt1078/test/Message.proto
  18. 59
      src/main/java/cn/org/hentai/jtt1078/test/NettyServerTest.java
  19. 3
      src/main/resources/app.properties

@ -55,6 +55,17 @@
<artifactId>jump3r</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.3.25</version>
</dependency>
</dependencies>
<build>

@ -1,5 +1,7 @@
package cn.org.hentai.jtt1078.app;
import cn.org.hentai.jtt1078.app.websocket.MyException;
import cn.org.hentai.jtt1078.app.websocket.WebsocketClient;
import cn.org.hentai.jtt1078.http.GeneralResponseWriter;
import cn.org.hentai.jtt1078.http.NettyHttpServerHandler;
import cn.org.hentai.jtt1078.publisher.PublishManager;
@ -19,19 +21,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.net.URISyntaxException;
/**
* Created by matrixy on 2019/4/9.
*/
public class VideoServerApp
{
public class VideoServerApp {
private static Logger logger = LoggerFactory.getLogger(VideoServerApp.class);
public static void main(String[] args) throws Exception
{
public static void main(String[] args) throws Exception {
Configs.init("/app.properties");
PublishManager.init();
SessionManager.init();
@ -39,29 +39,37 @@ public class VideoServerApp
VideoServer videoServer = new VideoServer();
HttpServer httpServer = new HttpServer();
Signal.handle(new Signal("TERM"), new SignalHandler()
{
Signal.handle(new Signal("TERM"), new SignalHandler() {
@Override
public void handle(Signal signal)
{
public void handle(Signal signal) {
videoServer.shutdown();
httpServer.shutdown();
}
});
try (WebsocketClient websocketClient = new WebsocketClient("ws://localhost:19800/ws", 15)) {
// 连接
websocketClient.connect();
// 发送消息
websocketClient.write("xxxxxxxxxxxxxxxxx");
// 获取结果
String result = websocketClient.receiveResult();
logger.info("接收到结果[{}]", result);
} catch (URISyntaxException | MyException e) {
logger.error("发生异常,原因:{}", e.getMessage(), e);
}
videoServer.start();
httpServer.start();
}
static class VideoServer
{
static class VideoServer {
private static ServerBootstrap serverBootstrap;
private static EventLoopGroup bossGroup;
private static EventLoopGroup workerGroup;
private static void start() throws Exception
{
private static void start() throws Exception {
serverBootstrap = new ServerBootstrap();
serverBootstrap.option(ChannelOption.SO_BACKLOG, Configs.getInt("server.backlog", 102400));
bossGroup = new NioEventLoopGroup(Configs.getInt("server.worker-count", Runtime.getRuntime().availableProcessors()));
@ -86,39 +94,31 @@ public class VideoServerApp
ch.closeFuture();
}
private static void shutdown()
{
try
{
private static void shutdown() {
try {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
catch(Exception e)
{
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class HttpServer
{
static class HttpServer {
private static ServerBootstrap serverBootstrap;
private static EventLoopGroup bossGroup;
private static EventLoopGroup workerGroup;
private static void start() throws Exception
{
private static void start() throws Exception {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>()
{
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception
{
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new GeneralResponseWriter(),
new HttpResponseEncoder(),
@ -129,35 +129,27 @@ public class VideoServerApp
}
}).option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try
{
try {
int port = Configs.getInt("server.http.port", 3333);
ChannelFuture f = bootstrap.bind(InetAddress.getByName("0.0.0.0"), port).sync();
logger.info("HTTP Server started at: {}", port);
f.channel().closeFuture().sync();
}
catch (InterruptedException e)
{
} catch (InterruptedException e) {
logger.error("http server error", e);
}
finally
{
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
private static void shutdown()
{
try
{
private static void shutdown() {
try {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
catch(Exception e)
{
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

@ -0,0 +1,123 @@
package cn.org.hentai.jtt1078.app.websocket;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public abstract class AbstractWebsocketClient implements Closeable {
private static final Logger log = LoggerFactory.getLogger(AbstractWebsocketClient.class);
/**
* 接收响应的超时时间()
*/
private final int connectionTimeout;
/**
* 任务上下文
*/
protected WebsocketContext websocketContext;
public AbstractWebsocketClient(int connectionTimeout, String serviceType) {
this.connectionTimeout = connectionTimeout;
this.websocketContext = new WebsocketContext(new CountDownLatch(1));
}
/**
* 发送消息.<br>
*
* @param message 发送文本
* @return:
*/
public void write(String message) throws MyException {
Channel channel = getChannel();
if (channel != null) {
channel.writeAndFlush(new TextWebSocketFrame(message));
return;
}
throw new MyException("连接已经关闭");
}
/**
* 连接并发送消息.<br>
*
* @return:
*/
public void connect() throws MyException {
try {
doOpen();
doConnect();
} catch (Exception e) {
throw new MyException("连接没有成功打开,原因是:{}" + e.getMessage(), e);
}
}
/**
* 接收消息.<br>
*
* @return: {@link java.lang.String}
*/
public String receiveResult() throws MyException {
this.receive(this.websocketContext.getCountDownLatch());
if (StringUtils.isEmpty(this.websocketContext.getResult())) {
throw new MyException("未获取到任务结果信息");
}
return this.websocketContext.getResult();
}
/**
* 接收消息封装.<br>
*
* @param countDownLatch 计数器
* @return:
*/
private void receive(CountDownLatch countDownLatch) throws MyException {
boolean waitFlag = false;
try {
waitFlag = countDownLatch.await(connectionTimeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("此连接未接收到响应信息");
Thread.currentThread().interrupt();
}
if (!waitFlag) {
log.error("Timeout({}}s) when receiving response message", connectionTimeout);
throw new MyException("此连接未接收到响应信息");
}
}
/**
* 初始化连接.<br>
*
* @return:
*/
protected abstract void doOpen();
/**
* 建立连接.<br>
*
* @return:
*/
protected abstract void doConnect() throws MyException;
/**
* 获取本次连接channel.<br>
*
* @return: {@link Channel}
*/
protected abstract Channel getChannel();
/**
* 关闭连接.<br>
*
* @return:
* @exception:
*/
@Override
public abstract void close();
}

@ -0,0 +1,26 @@
package cn.org.hentai.jtt1078.app.websocket;
public class MyException extends Exception {
public MyException() {
super();
}
public MyException(String message) {
super(message);
}
/**
* 用指定的详细信息和原因构造一个新的异常.<br>
*
* @param message
* @param cause
* @return:
*/
public MyException(String message, Throwable cause) {
super(message, cause);
}
}

@ -0,0 +1,26 @@
package cn.org.hentai.jtt1078.app.websocket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> {
private final WebsocketClientHandler handler;
public WebsocketChannelInitializer(WebsocketClientHandler handler) {
this.handler = handler;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));
p.addLast(WebSocketClientCompressionHandler.INSTANCE);
p.addLast(handler);
}
}

@ -0,0 +1,99 @@
package cn.org.hentai.jtt1078.app.websocket;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
public class WebsocketClient extends AbstractWebsocketClient {
private static final Logger log = LoggerFactory.getLogger(WebsocketClient.class);
private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
private final URI uri;
private final int port;
private Bootstrap bootstrap;
private WebsocketClientHandler handler;
private Channel channel;
public WebsocketClient(String url, int connectionTimeout) throws URISyntaxException, MyException {
super(connectionTimeout, "ws");
this.uri = new URI(url);
this.port = getPort();
}
/**
* Extract the specified port
*
* @return the specified port or the default port for the specific scheme
*/
private int getPort() throws MyException {
int port = uri.getPort();
if (port == -1) {
String scheme = uri.getScheme();
if ("wss".equals(scheme)) {
return 443;
} else if ("ws".equals(scheme)) {
return 19800;
} else {
throw new MyException("unknown scheme: " + scheme);
}
}
return port;
}
@Override
protected void doOpen() {
// websocket客户端握手实现的基类
WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
// 业务处理类
handler = new WebsocketClientHandler(webSocketClientHandshaker, this.websocketContext);
// client端,引导client channel启动
bootstrap = new Bootstrap();
// 添加管道 绑定端口 添加作用域等
bootstrap.group(NIO_GROUP).channel(NioSocketChannel.class).handler(new WebsocketChannelInitializer(handler));
}
@Override
protected void doConnect() {
try {
// 启动连接
channel = bootstrap.connect(uri.getHost(), port).sync().channel();
// 等待握手响应
handler.handshakeFuture().sync();
} catch (InterruptedException e) {
log.error("websocket连接发生异常", e);
Thread.currentThread().interrupt();
}
}
@Override
protected Channel getChannel() {
return channel;
}
@Override
public void close() {
if (channel != null) {
channel.close();
}
}
public boolean isOpen() {
return channel.isOpen();
}
}

@ -0,0 +1,37 @@
package cn.org.hentai.jtt1078.app.websocket;
import java.util.concurrent.CountDownLatch;
public class WebsocketContext {
/**
* 计数器(用于监听是否返回结果)
*/
private CountDownLatch countDownLatch;
/**
* 最终结果
*/
private String result;
public WebsocketContext(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}

@ -0,0 +1,28 @@
package cn.org.hentai.jtt1078.controller;
import cn.org.hentai.jtt1078.publisher.PublishManager;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
@RestController
@RequestMapping
public class BusinessController {
/**
* 获取当前推送视频车载机的clientID
*
* @return
*/
@GetMapping("device/currentPush")
public Map<String, String> getCurPushDevice() {
Map<String, String> data = new HashMap<>();
data.put("status", "OK");
data.put("code", "200");
data.put("data", PublishManager.getInstance().getCurPushDevice());
return data;
}
}

@ -62,6 +62,14 @@ public final class PublishManager {
if (chl != null) chl.close();
}
public String getCurPushDevice() {
if (channels.size() == 0) {
return "";
}
Channel chl = channels.get(0);
return chl.tag;
}
public void unsubscribe(String tag, long watcherId) {
Channel chl = channels.get(tag);
if (chl != null) chl.unsubscribe(watcherId);

@ -4,6 +4,8 @@ import cn.org.hentai.jtt1078.codec.MP3Encoder;
import cn.org.hentai.jtt1078.flv.AudioTag;
import cn.org.hentai.jtt1078.flv.FlvAudioTagEncoder;
import cn.org.hentai.jtt1078.flv.FlvEncoder;
import cn.org.hentai.jtt1078.publisher.Channel;
import cn.org.hentai.jtt1078.publisher.PublishManager;
import cn.org.hentai.jtt1078.util.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@ -46,6 +48,9 @@ public class RTMPPublisher extends Thread {
while ((len = stderr.read(buff)) > -1) {
if (debugMode) System.out.print(new String(buff, 0, len));
}
// 若ffmpeg命令执行失败, 视为推流失败, 将通道从缓存中删除
PublishManager.getInstance().close(tag);
logger.info("Process FFMPEG exited...");
} catch (Exception ex) {
logger.error("publish failed", ex);

@ -11,24 +11,19 @@ import java.nio.channels.ByteChannel;
/**
* Created by matrixy on 2020/1/9.
*/
public class ChannelTest implements ByteChannel
{
public class ChannelTest implements ByteChannel {
byte[] temp = new byte[4];
ByteHolder buffer = new ByteHolder(1024);
// 读出,存入dst
@Override
public int read(ByteBuffer dst) throws IOException
{
public int read(ByteBuffer dst) throws IOException {
dst.flip();
int len = Math.min(4, buffer.size());
if (dst.remaining() > len)
{
if (dst.remaining() > len) {
buffer.sliceInto(temp, len);
dst.put(temp, 0, len);
}
else
{
} else {
// 丢掉???
}
dst.flip();
@ -37,8 +32,7 @@ public class ChannelTest implements ByteChannel
// 从src读出,写入进来
@Override
public int write(ByteBuffer src) throws IOException
{
public int write(ByteBuffer src) throws IOException {
int len = -1;
// src.flip();
len = Math.min(4, src.limit());
@ -50,30 +44,26 @@ public class ChannelTest implements ByteChannel
}
@Override
public boolean isOpen()
{
public boolean isOpen() {
return true;
}
@Override
public void close() throws IOException
{
public void close() throws IOException {
}
public byte[] array()
{
public byte[] array() {
return buffer.array();
}
public static void main(String[] args) throws Exception
{
public static void main(String[] args) throws Exception {
ChannelTest chl = new ChannelTest();
ByteBuffer buffer = ByteBuffer.allocate(4);
java.nio.ByteBuffer xx;
System.out.println(buffer.getClass().getName());
for (int i = 0; i < 4096; i++)
buffer.put((byte)'f');
buffer.put((byte) 'f');
/*
buffer.putLong(0x1122334455667788L);
buffer.flip();
@ -85,26 +75,22 @@ public class ChannelTest implements ByteChannel
*/
}
static final class ByteBufferWrapper
{
static final class ByteBufferWrapper {
boolean writeMode;
ByteBuffer buffer;
private ByteBufferWrapper(int size)
{
private ByteBufferWrapper(int size) {
this.buffer = ByteBuffer.allocate(size);
}
// 控制写入,代理过来
public void write()
{
public void write() {
}
// 写出就无所谓了
public static ByteBufferWrapper create(int size)
{
public static ByteBufferWrapper create(int size) {
return new ByteBufferWrapper(size);
}
}

@ -0,0 +1,259 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Command.proto
package cn.org.hentai.jtt1078.test;
public final class Command {
private Command() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistryLite registry) {
}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
registerAllExtensions(
(com.google.protobuf.ExtensionRegistryLite) registry);
}
/**
* <pre>
**
* 指令类型
* </pre>
*
* Protobuf enum {@code CommandType}
*/
public enum CommandType
implements com.google.protobuf.ProtocolMessageEnum {
/**
* <pre>
**
* 验证
* </pre>
*
* <code>AUTH = 1;</code>
*/
AUTH(1),
/**
* <pre>
**
* ping
* </pre>
*
* <code>PING = 2;</code>
*/
PING(2),
/**
* <pre>
**
* pong
* </pre>
*
* <code>PONG = 3;</code>
*/
PONG(3),
/**
* <pre>
**
* 上传数据
* </pre>
*
* <code>UPLOAD_DATA = 4;</code>
*/
UPLOAD_DATA(4),
/**
* <pre>
**
* 推送数据
* </pre>
*
* <code>PUSH_DATA = 5;</code>
*/
PUSH_DATA(5),
/**
* <pre>
**
* 验证返回
* </pre>
*
* <code>AUTH_BACK = 11;</code>
*/
AUTH_BACK(11),
/**
* <code>UPLOAD_DATA_BACK = 14;</code>
*/
UPLOAD_DATA_BACK(14),
/**
* <code>PUSH_DATA_BACK = 15;</code>
*/
PUSH_DATA_BACK(15),
;
/**
* <pre>
**
* 验证
* </pre>
*
* <code>AUTH = 1;</code>
*/
public static final int AUTH_VALUE = 1;
/**
* <pre>
**
* ping
* </pre>
*
* <code>PING = 2;</code>
*/
public static final int PING_VALUE = 2;
/**
* <pre>
**
* pong
* </pre>
*
* <code>PONG = 3;</code>
*/
public static final int PONG_VALUE = 3;
/**
* <pre>
**
* 上传数据
* </pre>
*
* <code>UPLOAD_DATA = 4;</code>
*/
public static final int UPLOAD_DATA_VALUE = 4;
/**
* <pre>
**
* 推送数据
* </pre>
*
* <code>PUSH_DATA = 5;</code>
*/
public static final int PUSH_DATA_VALUE = 5;
/**
* <pre>
**
* 验证返回
* </pre>
*
* <code>AUTH_BACK = 11;</code>
*/
public static final int AUTH_BACK_VALUE = 11;
/**
* <code>UPLOAD_DATA_BACK = 14;</code>
*/
public static final int UPLOAD_DATA_BACK_VALUE = 14;
/**
* <code>PUSH_DATA_BACK = 15;</code>
*/
public static final int PUSH_DATA_BACK_VALUE = 15;
public final int getNumber() {
return value;
}
/**
* @deprecated Use {@link #forNumber(int)} instead.
*/
@Deprecated
public static CommandType valueOf(int value) {
return forNumber(value);
}
public static CommandType forNumber(int value) {
switch (value) {
case 1: return AUTH;
case 2: return PING;
case 3: return PONG;
case 4: return UPLOAD_DATA;
case 5: return PUSH_DATA;
case 11: return AUTH_BACK;
case 14: return UPLOAD_DATA_BACK;
case 15: return PUSH_DATA_BACK;
default: return null;
}
}
public static com.google.protobuf.Internal.EnumLiteMap<CommandType>
internalGetValueMap() {
return internalValueMap;
}
private static final com.google.protobuf.Internal.EnumLiteMap<
CommandType> internalValueMap =
new com.google.protobuf.Internal.EnumLiteMap<CommandType>() {
public CommandType findValueByNumber(int number) {
return CommandType.forNumber(number);
}
};
public final com.google.protobuf.Descriptors.EnumValueDescriptor
getValueDescriptor() {
return getDescriptor().getValues().get(ordinal());
}
public final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptorForType() {
return getDescriptor();
}
public static final com.google.protobuf.Descriptors.EnumDescriptor
getDescriptor() {
return Command.getDescriptor().getEnumTypes().get(0);
}
private static final CommandType[] VALUES = values();
public static CommandType valueOf(
com.google.protobuf.Descriptors.EnumValueDescriptor desc) {
if (desc.getType() != getDescriptor()) {
throw new IllegalArgumentException(
"EnumValueDescriptor is not for this type.");
}
return VALUES[desc.getIndex()];
}
private final int value;
private CommandType(int value) {
this.value = value;
}
// @@protoc_insertion_point(enum_scope:CommandType)
}
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
String[] descriptorData = {
"\n\rCommand.proto\032\rCommand.proto*\204\001\n\013Comma" +
"ndType\022\010\n\004AUTH\020\001\022\010\n\004PING\020\002\022\010\n\004PONG\020\003\022\017\n\013" +
"UPLOAD_DATA\020\004\022\r\n\tPUSH_DATA\020\005\022\r\n\tAUTH_BAC" +
"K\020\013\022\024\n\020UPLOAD_DATA_BACK\020\016\022\022\n\016PUSH_DATA_B" +
"ACK\020\017B$\n\031com.netty.common.protobufB\007Comm" +
"and"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
Command.getDescriptor(),
}, assigner);
Command.getDescriptor();
}
// @@protoc_insertion_point(outer_class_scope)
}

@ -0,0 +1,39 @@
syntax = "proto2";
option java_package="cn.myzf.common.protobuf";
option java_outer_classname = "Command";
import "cn/myzf/common/protobuf/Command.proto";
/**
*
*/
enum CommandType {
/**
*
*/
AUTH = 1;
/**
* ping
*/
PING = 2;
/**
* pong
*/
PONG = 3;
/**
*
*/
UPLOAD_DATA = 4;
/**
*
*/
PUSH_DATA = 5;
/**
*
*/
AUTH_BACK = 11;
UPLOAD_DATA_BACK = 14;
PUSH_DATA_BACK = 15;
}

@ -1,63 +1,109 @@
package cn.org.hentai.jtt1078.test;
import cn.org.hentai.jtt1078.app.VideoServerApp;
import cn.org.hentai.jtt1078.util.Packet;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.timeout.IdleStateHandler;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* Created by matrixy on 2019/12/18.
*/
public class Connect808Test {
private Bootstrap b = new Bootstrap();
private EventLoopGroup group;
public Connect808Test() {
group = new NioEventLoopGroup();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
private final static String HOST = "127.0.0.1";
private final static int PORT = 19999;
// private final static int PORT = 7611;
private final static int READER_IDLE_TIME_SECONDS = 0;//读操作空闲20秒
private final static int WRITER_IDLE_TIME_SECONDS = 5;//写操作空闲20秒
private final static int ALL_IDLE_TIME_SECONDS = 0;//读写全部空闲40秒
}
});
}
public static void main(String[] args) {
String host = "127.0.0.1";
// int port = 7611;
int port = 19999;
public void connect(String host, int port) {
System.out.println("11111111111 connect " + host + " " + Thread.currentThread());
b.connect(host, port).addListener(new ChannelFutureListener() {
new Thread(new Runnable() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
/*
* 这里就不是主线程了这里是 netty 线程中执行
*/
if (future.isSuccess()) {
System.out.println("2222222222222 connect success " + host + " " + Thread.currentThread());
} else {
System.out.println("333333333333333 connect failed " + host + " " + Thread.currentThread());
// 连接不成功,5秒后重新连接
future.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
System.out.println("4444444444444 reconnect " + host + " " + Thread.currentThread());
connect(host, port);
}
}, 5, TimeUnit.SECONDS);
public void run() {
try {
new Connect808Test().doConnect(new Bootstrap(), new NioEventLoopGroup());
} catch (Exception e) {
e.printStackTrace();
}
}
});
}).start();
}
public void stop() {
if (group != null) {
group.shutdownGracefully();
group = null;
/**
* netty client 连接连接失败5秒后重试连接
*/
public Bootstrap doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
try {
if (bootstrap != null) {
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("idleStateHandler", new IdleStateHandler(READER_IDLE_TIME_SECONDS
, WRITER_IDLE_TIME_SECONDS, ALL_IDLE_TIME_SECONDS, TimeUnit.SECONDS));
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(Message.MessageBase.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast("clientHandler", new LogicClientHandler());
// p.addLast("idleTimeoutHandler", new HeartHandler(NettyClient.this));
}
});
bootstrap.remoteAddress(HOST, PORT);
ChannelFuture f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess()) {
// log.warn("连接服务器失败,5s后重新尝试连接!");
futureListener.channel().eventLoop().schedule(() -> doConnect(new Bootstrap(), eventLoop), 5, TimeUnit.SECONDS);
}
});
f.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return bootstrap;
}
private void startConsoleThread(Channel channel) {
new Thread(() -> {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.print("输入消息发送至服务端 : ");
String hex = "7e0100002e0123456789017fff001f00730000000034000000000000000000000042534a2d47462d30367465737431323301b2e241383838383838157e";
Packet packet = Packet.create(hex.getBytes());
byte[] bytes = "|123,0100,1000;test|".getBytes(StandardCharsets.UTF_8);
String data = ByteBufUtil.hexDump(bytes);
System.out.println(data);
channel.writeAndFlush(bytes);
}
}, 1000, 10000);
}).start();
}
}

@ -0,0 +1,36 @@
package cn.org.hentai.jtt1078.test;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogicClientHandler extends SimpleChannelInboundHandler<Message> {
public Logger log = LoggerFactory.getLogger(this.getClass());
private final static String CLIENTID = "test11111";
// 连接成功后,向server发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Message.MessageBase.Builder authMsg = Message.MessageBase.newBuilder();
authMsg.setClientId(CLIENTID);
authMsg.setCmd(Command.CommandType.AUTH);
// authMsg.setCmd(null);
authMsg.setData("This is auth data44444444444444444");
ctx.writeAndFlush(authMsg.build());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.debug("连接断开");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
}
}

@ -0,0 +1,879 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: Message.proto
package cn.org.hentai.jtt1078.test;
public final class Message {
private Message() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistryLite registry) {
}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
registerAllExtensions(
(com.google.protobuf.ExtensionRegistryLite) registry);
}
public interface MessageBaseOrBuilder extends
// @@protoc_insertion_point(interface_extends:MessageBase)
com.google.protobuf.MessageOrBuilder {
/**
* <code>required string clientId = 1;</code>
*/
boolean hasClientId();
/**
* <code>required string clientId = 1;</code>
*/
String getClientId();
/**
* <code>required string clientId = 1;</code>
*/
com.google.protobuf.ByteString
getClientIdBytes();
/**
* <code>required .CommandType cmd = 2;</code>
*/
boolean hasCmd();
/**
* <code>required .CommandType cmd = 2;</code>
*/
Command.CommandType getCmd();
/**
* <code>optional string data = 3;</code>
*/
boolean hasData();
/**
* <code>optional string data = 3;</code>
*/
String getData();
/**
* <code>optional string data = 3;</code>
*/
com.google.protobuf.ByteString
getDataBytes();
}
/**
* Protobuf type {@code MessageBase}
*/
public static final class MessageBase extends
com.google.protobuf.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:MessageBase)
MessageBaseOrBuilder {
// Use MessageBase.newBuilder() to construct.
private MessageBase(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private MessageBase() {
clientId_ = "";
cmd_ = 1;
data_ = "";
}
@Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private MessageBase(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
this();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
com.google.protobuf.ByteString bs = input.readBytes();
bitField0_ |= 0x00000001;
clientId_ = bs;
break;
}
case 16: {
int rawValue = input.readEnum();
Command.CommandType value = Command.CommandType.valueOf(rawValue);
if (value == null) {
unknownFields.mergeVarintField(2, rawValue);
} else {
bitField0_ |= 0x00000002;
cmd_ = rawValue;
}
break;
}
case 26: {
com.google.protobuf.ByteString bs = input.readBytes();
bitField0_ |= 0x00000004;
data_ = bs;
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return Message.internal_static_MessageBase_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return Message.internal_static_MessageBase_fieldAccessorTable
.ensureFieldAccessorsInitialized(
Message.MessageBase.class, Message.MessageBase.Builder.class);
}
private int bitField0_;
public static final int CLIENTID_FIELD_NUMBER = 1;
private volatile Object clientId_;
/**
* <code>required string clientId = 1;</code>
*/
public boolean hasClientId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string clientId = 1;</code>
*/
public String getClientId() {
Object ref = clientId_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
clientId_ = s;
}
return s;
}
}
/**
* <code>required string clientId = 1;</code>
*/
public com.google.protobuf.ByteString
getClientIdBytes() {
Object ref = clientId_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
clientId_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
public static final int CMD_FIELD_NUMBER = 2;
private int cmd_;
/**
* <code>required .CommandType cmd = 2;</code>
*/
public boolean hasCmd() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>required .CommandType cmd = 2;</code>
*/
public Command.CommandType getCmd() {
Command.CommandType result = Command.CommandType.valueOf(cmd_);
return result == null ? Command.CommandType.AUTH : result;
}
public static final int DATA_FIELD_NUMBER = 3;
private volatile Object data_;
/**
* <code>optional string data = 3;</code>
*/
public boolean hasData() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional string data = 3;</code>
*/
public String getData() {
Object ref = data_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
data_ = s;
}
return s;
}
}
/**
* <code>optional string data = 3;</code>
*/
public com.google.protobuf.ByteString
getDataBytes() {
Object ref = data_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
data_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized == 1) return true;
if (isInitialized == 0) return false;
if (!hasClientId()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasCmd()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
com.google.protobuf.GeneratedMessageV3.writeString(output, 1, clientId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeEnum(2, cmd_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
com.google.protobuf.GeneratedMessageV3.writeString(output, 3, data_);
}
unknownFields.writeTo(output);
}
public int getSerializedSize() {
int size = memoizedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, clientId_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeEnumSize(2, cmd_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, data_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@Override
public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Message.MessageBase)) {
return super.equals(obj);
}
Message.MessageBase other = (Message.MessageBase) obj;
boolean result = true;
result = result && (hasClientId() == other.hasClientId());
if (hasClientId()) {
result = result && getClientId()
.equals(other.getClientId());
}
result = result && (hasCmd() == other.hasCmd());
if (hasCmd()) {
result = result && cmd_ == other.cmd_;
}
result = result && (hasData() == other.hasData());
if (hasData()) {
result = result && getData()
.equals(other.getData());
}
result = result && unknownFields.equals(other.unknownFields);
return result;
}
@Override
public int hashCode() {
if (memoizedHashCode != 0) {
return memoizedHashCode;
}
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
if (hasClientId()) {
hash = (37 * hash) + CLIENTID_FIELD_NUMBER;
hash = (53 * hash) + getClientId().hashCode();
}
if (hasCmd()) {
hash = (37 * hash) + CMD_FIELD_NUMBER;
hash = (53 * hash) + cmd_;
}
if (hasData()) {
hash = (37 * hash) + DATA_FIELD_NUMBER;
hash = (53 * hash) + getData().hashCode();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
}
public static Message.MessageBase parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static Message.MessageBase parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static Message.MessageBase parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static Message.MessageBase parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static Message.MessageBase parseFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static Message.MessageBase parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
public static Message.MessageBase parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
public static Message.MessageBase parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input, extensionRegistry);
}
public static Message.MessageBase parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static Message.MessageBase parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return com.google.protobuf.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder() {
return DEFAULT_INSTANCE.toBuilder();
}
public static Builder newBuilder(Message.MessageBase prototype) {
return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
}
public Builder toBuilder() {
return this == DEFAULT_INSTANCE
? new Builder() : new Builder().mergeFrom(this);
}
@Override
protected Builder newBuilderForType(
BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code MessageBase}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
// @@protoc_insertion_point(builder_implements:MessageBase)
Message.MessageBaseOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return Message.internal_static_MessageBase_descriptor;
}
protected FieldAccessorTable
internalGetFieldAccessorTable() {
return Message.internal_static_MessageBase_fieldAccessorTable
.ensureFieldAccessorsInitialized(
Message.MessageBase.class, Message.MessageBase.Builder.class);
}
// Construct using Message.MessageBase.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessageV3
.alwaysUseFieldBuilders) {
}
}
public Builder clear() {
super.clear();
clientId_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
cmd_ = 1;
bitField0_ = (bitField0_ & ~0x00000002);
data_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return Message.internal_static_MessageBase_descriptor;
}
public Message.MessageBase getDefaultInstanceForType() {
return Message.MessageBase.getDefaultInstance();
}
public Message.MessageBase build() {
Message.MessageBase result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public Message.MessageBase buildPartial() {
Message.MessageBase result = new Message.MessageBase(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.clientId_ = clientId_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.cmd_ = cmd_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.data_ = data_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder clone() {
return (Builder) super.clone();
}
public Builder setField(
com.google.protobuf.Descriptors.FieldDescriptor field,
Object value) {
return (Builder) super.setField(field, value);
}
public Builder clearField(
com.google.protobuf.Descriptors.FieldDescriptor field) {
return (Builder) super.clearField(field);
}
public Builder clearOneof(
com.google.protobuf.Descriptors.OneofDescriptor oneof) {
return (Builder) super.clearOneof(oneof);
}
public Builder setRepeatedField(
com.google.protobuf.Descriptors.FieldDescriptor field,
int index, Object value) {
return (Builder) super.setRepeatedField(field, index, value);
}
public Builder addRepeatedField(
com.google.protobuf.Descriptors.FieldDescriptor field,
Object value) {
return (Builder) super.addRepeatedField(field, value);
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof Message.MessageBase) {
return mergeFrom((Message.MessageBase)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(Message.MessageBase other) {
if (other == Message.MessageBase.getDefaultInstance()) return this;
if (other.hasClientId()) {
bitField0_ |= 0x00000001;
clientId_ = other.clientId_;
onChanged();
}
if (other.hasCmd()) {
setCmd(other.getCmd());
}
if (other.hasData()) {
bitField0_ |= 0x00000004;
data_ = other.data_;
onChanged();
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
}
public final boolean isInitialized() {
if (!hasClientId()) {
return false;
}
if (!hasCmd()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Message.MessageBase parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (Message.MessageBase) e.getUnfinishedMessage();
throw e.unwrapIOException();
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
private Object clientId_ = "";
/**
* <code>required string clientId = 1;</code>
*/
public boolean hasClientId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string clientId = 1;</code>
*/
public String getClientId() {
Object ref = clientId_;
if (!(ref instanceof String)) {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
clientId_ = s;
}
return s;
} else {
return (String) ref;
}
}
/**
* <code>required string clientId = 1;</code>
*/
public com.google.protobuf.ByteString
getClientIdBytes() {
Object ref = clientId_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
clientId_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>required string clientId = 1;</code>
*/
public Builder setClientId(
String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
clientId_ = value;
onChanged();
return this;
}
/**
* <code>required string clientId = 1;</code>
*/
public Builder clearClientId() {
bitField0_ = (bitField0_ & ~0x00000001);
clientId_ = getDefaultInstance().getClientId();
onChanged();
return this;
}
/**
* <code>required string clientId = 1;</code>
*/
public Builder setClientIdBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
clientId_ = value;
onChanged();
return this;
}
private int cmd_ = 1;
/**
* <code>required .CommandType cmd = 2;</code>
*/
public boolean hasCmd() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>required .CommandType cmd = 2;</code>
*/
public Command.CommandType getCmd() {
Command.CommandType result = Command.CommandType.valueOf(cmd_);
return result == null ? Command.CommandType.AUTH : result;
}
/**
* <code>required .CommandType cmd = 2;</code>
*/
public Builder setCmd(Command.CommandType value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
cmd_ = value.getNumber();
onChanged();
return this;
}
/**
* <code>required .CommandType cmd = 2;</code>
*/
public Builder clearCmd() {
bitField0_ = (bitField0_ & ~0x00000002);
cmd_ = 1;
onChanged();
return this;
}
private Object data_ = "";
/**
* <code>optional string data = 3;</code>
*/
public boolean hasData() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
* <code>optional string data = 3;</code>
*/
public String getData() {
Object ref = data_;
if (!(ref instanceof String)) {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
data_ = s;
}
return s;
} else {
return (String) ref;
}
}
/**
* <code>optional string data = 3;</code>
*/
public com.google.protobuf.ByteString
getDataBytes() {
Object ref = data_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
data_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>optional string data = 3;</code>
*/
public Builder setData(
String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000004;
data_ = value;
onChanged();
return this;
}
/**
* <code>optional string data = 3;</code>
*/
public Builder clearData() {
bitField0_ = (bitField0_ & ~0x00000004);
data_ = getDefaultInstance().getData();
onChanged();
return this;
}
/**
* <code>optional string data = 3;</code>
*/
public Builder setDataBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000004;
data_ = value;
onChanged();
return this;
}
public final Builder setUnknownFields(
final com.google.protobuf.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
}
public final Builder mergeUnknownFields(
final com.google.protobuf.UnknownFieldSet unknownFields) {
return super.mergeUnknownFields(unknownFields);
}
// @@protoc_insertion_point(builder_scope:MessageBase)
}
// @@protoc_insertion_point(class_scope:MessageBase)
private static final Message.MessageBase DEFAULT_INSTANCE;
static {
DEFAULT_INSTANCE = new Message.MessageBase();
}
public static Message.MessageBase getDefaultInstance() {
return DEFAULT_INSTANCE;
}
@Deprecated public static final com.google.protobuf.Parser<MessageBase>
PARSER = new com.google.protobuf.AbstractParser<MessageBase>() {
public MessageBase parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new MessageBase(input, extensionRegistry);
}
};
public static com.google.protobuf.Parser<MessageBase> parser() {
return PARSER;
}
@Override
public com.google.protobuf.Parser<MessageBase> getParserForType() {
return PARSER;
}
public Message.MessageBase getDefaultInstanceForType() {
return DEFAULT_INSTANCE;
}
}
private static final com.google.protobuf.Descriptors.Descriptor
internal_static_MessageBase_descriptor;
private static final
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
internal_static_MessageBase_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
String[] descriptorData = {
"\n\rMessage.proto\032\rCommand.proto\"H\n\013Messag" +
"eBase\022\020\n\010clientId\030\001 \002(\t\022\031\n\003cmd\030\002 \002(\0162\014.C" +
"ommandType\022\014\n\004data\030\003 \001(\tB$\n\031com.netty.co" +
"mmon.protobufB\007Message"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
Command.getDescriptor(),
}, assigner);
internal_static_MessageBase_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_MessageBase_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_MessageBase_descriptor,
new String[] { "ClientId", "Cmd", "Data", });
Command.getDescriptor();
}
// @@protoc_insertion_point(outer_class_scope)
}

@ -0,0 +1,11 @@
syntax = "proto2";
option java_package="cn.myzf.common.protobuf";
option java_outer_classname = "Message";
import "cn/myzf/common/protobuf/Command.proto";
message MessageBase {
required string clientId = 1;
required CommandType cmd = 2;
optional string data = 3;
}

@ -0,0 +1,59 @@
package cn.org.hentai.jtt1078.test;
import cn.org.hentai.jtt1078.util.Packet;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* Created by matrixy on 2019/12/18.
*/
public class NettyServerTest {
public static void main(String[] args) {
// Server配置
//boss loop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//worker loop
EventLoopGroup workerGroup = new NioEventLoopGroup();
// final CheerUpServerHandler serverHandler = new CheerUpServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// tcp/ip协议listen函数中的backlog参数,等待连接池的大小
.option(ChannelOption.SO_BACKLOG, 100)
//日志处理器
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
//初始化channel,添加handler
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//日志处理器
p.addLast(new LoggingHandler(LogLevel.INFO));
// p.addLast(serverHandler);
}
});
// 启动服务器
ChannelFuture f = b.bind(18348).sync();
// 等待channel关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@ -2,6 +2,9 @@ server.port = 1078
server.http.port = 3333
server.backlog = 1024
jtt808server.host = 127.0.0.1
jtt808server.port = 7611
# ffmpeg可执行文件路径,可以留空
#ffmpeg.path = C:/Users/Administrator/Desktop/ffmpeg-6.0-essentials_build/bin/ffmpeg.exe
ffmpeg.path = C:/Program Files/ffmpeg/bin/ffmpeg.exe

Loading…
Cancel
Save