添加websocket服务端

master
liuqingkun 3 years ago
parent dd472cadb2
commit 719f87c487
  1. 5
      jtt808-server/pom.xml
  2. 1
      jtt808-server/src/main/java/io/github/yezhihao/netmc/NettyConfig.java
  3. 50
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WebsocketServer.java
  4. 66
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java
  5. 59
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInitializer.java
  6. 3
      jtt808-server/src/main/java/org/yzh/web/config/JTConfig.java
  7. 7
      jtt808-server/src/test/java/io/github/yezhihao/netmc/websocket/ChannelInitializerTest.java

@ -170,6 +170,11 @@
<version>5.3.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.89.Final</version>
</dependency>
</dependencies>
<build>

@ -7,6 +7,7 @@ import io.github.yezhihao.netmc.codec.MessageEncoder;
import io.github.yezhihao.netmc.core.HandlerInterceptor;
import io.github.yezhihao.netmc.core.HandlerMapping;
import io.github.yezhihao.netmc.session.SessionManager;
import io.github.yezhihao.netmc.websocket.WebsocketServer;
import io.netty.util.NettyRuntime;
import io.netty.util.internal.ObjectUtil;

@ -0,0 +1,50 @@
package io.github.yezhihao.netmc.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* websocket 服务
*/
@Component
public class WebsocketServer {
private static final Logger log = LoggerFactory.getLogger(WebsocketServer.class);
/**
* netty端口
*/
private static int port = 19800;
private static class SingleWsServer {
static final WebsocketServer instance = new WebsocketServer();
}
public static WebsocketServer getInstance() {
return SingleWsServer.instance;
}
private EventLoopGroup mainGroup;
private EventLoopGroup subGroup;
private ServerBootstrap server;
private ChannelFuture future;
public WebsocketServer() {
mainGroup = new NioEventLoopGroup();
subGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(mainGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WsChannelInitializer());
}
public void start() {
this.future = server.bind(port);
log.info("netty websocket server 启动完毕... port = " + port);
}
}

@ -0,0 +1,66 @@
package io.github.yezhihao.netmc.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 处理消息的handler
*/
public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(WsChannelInboundHandler.class);
/**
* 用于记录和管理所有客户端的channle
*/
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 从channel缓冲区读数据
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 获得 channel
Channel currentChannel = ctx.channel();
// 获取客户端传输过来的消息
String content = msg.text();
log.info(" 群消息发送... content = " + content);
}
/**
* 当客户端连接服务端之后打开连接
* 获取客户端的channle并且放到ChannelGroup中去进行管理
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
users.add(ctx.channel());
log.info(" netty 获得连接..... ");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asShortText();
// 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
users.remove(ctx.channel());
log.info("客户端被移除,channelId为:" + channelId);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
// 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
ctx.channel().close();
users.remove(ctx.channel());
log.info(" netty 异常了...... ");
}
}

@ -0,0 +1,59 @@
package io.github.yezhihao.netmc.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 管道初始化
*/
public class WsChannelInitializer extends ChannelInitializer {
private static final Logger log = LoggerFactory.getLogger(WsChannelInitializer.class);
@Override
protected void initChannel(Channel channel) throws Exception {
log.info(" 管道初始化...... ");
ChannelPipeline pipeline = channel.pipeline();
// websocket 基于http协议,所以要有http编解码器
pipeline.addLast("HttpServerCodec", new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
// 几乎在netty中的编程,都会使用到此hanler
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
// 增加心跳支持 start
// 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
// 如果是读空闲或者写空闲,不处理
pipeline.addLast(new IdleStateHandler(8, 10, 12));
// 自定义的空闲状态检测
pipeline.addLast(new WsChannelInboundHandler());
// 以下是支持httpWebsocket
/**
* websocket 服务器处理的协议用于指定给客户端连接访问的路由 : /ws
* 本handler会帮你处理一些繁重的复杂的事
* 会帮你处理握手动作 handshakingclose, ping, pong ping + pong = 心跳
* 对于websocket来讲都是以frames进行传输的不同的数据类型对应的frames也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义的wshandler
pipeline.addLast(new WsChannelInboundHandler());
// 自定义 http
pipeline.addLast(new WsChannelInboundHandler());
}
}

@ -6,6 +6,7 @@ import io.github.yezhihao.netmc.codec.Delimiter;
import io.github.yezhihao.netmc.codec.LengthField;
import io.github.yezhihao.netmc.core.HandlerMapping;
import io.github.yezhihao.netmc.session.SessionManager;
import io.github.yezhihao.netmc.websocket.WebsocketServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@ -34,6 +35,8 @@ public class JTConfig {
@ConditionalOnProperty(value = "jt-server.jt808.port.tcp")
@Bean(initMethod = "start", destroyMethod = "stop")
public Server jt808TCPServer(@Value("${jt-server.jt808.port.tcp}") int port) {
WebsocketServer.getInstance().start();
return NettyConfig.custom()
.setIdleStateTime(180, 0, 0)
.setPort(port)

@ -0,0 +1,7 @@
package io.github.yezhihao.netmc.websocket;
import static org.junit.jupiter.api.Assertions.*;
class ChannelInitializerTest {
}
Loading…
Cancel
Save