From 719f87c487683173937c8ba0ab867b7b7cf5e269 Mon Sep 17 00:00:00 2001 From: liuqingkun Date: Mon, 8 May 2023 19:11:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0websocket=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jtt808-server/pom.xml | 5 ++ .../io/github/yezhihao/netmc/NettyConfig.java | 1 + .../netmc/websocket/WebsocketServer.java | 50 ++++++++++++++ .../websocket/WsChannelInboundHandler.java | 66 +++++++++++++++++++ .../netmc/websocket/WsChannelInitializer.java | 59 +++++++++++++++++ .../java/org/yzh/web/config/JTConfig.java | 3 + .../websocket/ChannelInitializerTest.java | 7 ++ 7 files changed, 191 insertions(+) create mode 100644 jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WebsocketServer.java create mode 100644 jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java create mode 100644 jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInitializer.java create mode 100644 jtt808-server/src/test/java/io/github/yezhihao/netmc/websocket/ChannelInitializerTest.java diff --git a/jtt808-server/pom.xml b/jtt808-server/pom.xml index 8bf382b..83d72cd 100644 --- a/jtt808-server/pom.xml +++ b/jtt808-server/pom.xml @@ -170,6 +170,11 @@ 5.3.25 compile + + io.netty + netty-all + 4.1.89.Final + diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/NettyConfig.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/NettyConfig.java index 0ae564a..eb7297d 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/NettyConfig.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/NettyConfig.java @@ -7,6 +7,7 @@ import io.github.yezhihao.netmc.codec.MessageEncoder; import io.github.yezhihao.netmc.core.HandlerInterceptor; import io.github.yezhihao.netmc.core.HandlerMapping; import io.github.yezhihao.netmc.session.SessionManager; +import io.github.yezhihao.netmc.websocket.WebsocketServer; import io.netty.util.NettyRuntime; import io.netty.util.internal.ObjectUtil; diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WebsocketServer.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WebsocketServer.java new file mode 100644 index 0000000..f669ef8 --- /dev/null +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WebsocketServer.java @@ -0,0 +1,50 @@ +package io.github.yezhihao.netmc.websocket; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * websocket 服务 + */ +@Component +public class WebsocketServer { + + private static final Logger log = LoggerFactory.getLogger(WebsocketServer.class); + /** + * netty端口 + */ + private static int port = 19800; + + private static class SingleWsServer { + static final WebsocketServer instance = new WebsocketServer(); + } + + public static WebsocketServer getInstance() { + return SingleWsServer.instance; + } + + private EventLoopGroup mainGroup; + private EventLoopGroup subGroup; + private ServerBootstrap server; + private ChannelFuture future; + + public WebsocketServer() { + mainGroup = new NioEventLoopGroup(); + subGroup = new NioEventLoopGroup(); + server = new ServerBootstrap(); + server.group(mainGroup, subGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new WsChannelInitializer()); + } + + public void start() { + this.future = server.bind(port); + log.info("netty websocket server 启动完毕... port = " + port); + } +} \ No newline at end of file diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java new file mode 100644 index 0000000..f8739d8 --- /dev/null +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java @@ -0,0 +1,66 @@ +package io.github.yezhihao.netmc.websocket; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 处理消息的handler + */ +public class WsChannelInboundHandler extends SimpleChannelInboundHandler { + + private static final Logger log = LoggerFactory.getLogger(WsChannelInboundHandler.class); + + /** + * 用于记录和管理所有客户端的channle + */ + public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + /** + * 从channel缓冲区读数据 + */ + @Override + protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { + // 获得 channel + Channel currentChannel = ctx.channel(); + // 获取客户端传输过来的消息 + String content = msg.text(); + + log.info(" 群消息发送... content = " + content); + } + + + /** + * 当客户端连接服务端之后(打开连接) + * 获取客户端的channle,并且放到ChannelGroup中去进行管理 + */ + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + users.add(ctx.channel()); + log.info(" netty 获得连接..... "); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + String channelId = ctx.channel().id().asShortText(); + // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel + users.remove(ctx.channel()); + log.info("客户端被移除,channelId为:" + channelId); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除 + ctx.channel().close(); + users.remove(ctx.channel()); + log.info(" netty 异常了...... "); + } + +} diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInitializer.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInitializer.java new file mode 100644 index 0000000..1ada157 --- /dev/null +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInitializer.java @@ -0,0 +1,59 @@ +package io.github.yezhihao.netmc.websocket; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 管道初始化 + */ +public class WsChannelInitializer extends ChannelInitializer { + + private static final Logger log = LoggerFactory.getLogger(WsChannelInitializer.class); + + @Override + protected void initChannel(Channel channel) throws Exception { + log.info(" 管道初始化...... "); + ChannelPipeline pipeline = channel.pipeline(); + // websocket 基于http协议,所以要有http编解码器 + pipeline.addLast("HttpServerCodec", new HttpServerCodec()); + + // 对写大数据流的支持 + pipeline.addLast(new ChunkedWriteHandler()); + + // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse + // 几乎在netty中的编程,都会使用到此hanler + pipeline.addLast(new HttpObjectAggregator(1024 * 64)); + + + // 增加心跳支持 start + // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开 + // 如果是读空闲或者写空闲,不处理 + pipeline.addLast(new IdleStateHandler(8, 10, 12)); + // 自定义的空闲状态检测 + pipeline.addLast(new WsChannelInboundHandler()); + + // 以下是支持httpWebsocket + /** + * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws + * 本handler会帮你处理一些繁重的复杂的事 + * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳 + * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同 + */ + pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); + + // 自定义的wshandler + pipeline.addLast(new WsChannelInboundHandler()); + + // 自定义 http + pipeline.addLast(new WsChannelInboundHandler()); + } +} + diff --git a/jtt808-server/src/main/java/org/yzh/web/config/JTConfig.java b/jtt808-server/src/main/java/org/yzh/web/config/JTConfig.java index 7e190dc..4da6fd2 100644 --- a/jtt808-server/src/main/java/org/yzh/web/config/JTConfig.java +++ b/jtt808-server/src/main/java/org/yzh/web/config/JTConfig.java @@ -6,6 +6,7 @@ import io.github.yezhihao.netmc.codec.Delimiter; import io.github.yezhihao.netmc.codec.LengthField; import io.github.yezhihao.netmc.core.HandlerMapping; import io.github.yezhihao.netmc.session.SessionManager; +import io.github.yezhihao.netmc.websocket.WebsocketServer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -34,6 +35,8 @@ public class JTConfig { @ConditionalOnProperty(value = "jt-server.jt808.port.tcp") @Bean(initMethod = "start", destroyMethod = "stop") public Server jt808TCPServer(@Value("${jt-server.jt808.port.tcp}") int port) { + WebsocketServer.getInstance().start(); + return NettyConfig.custom() .setIdleStateTime(180, 0, 0) .setPort(port) diff --git a/jtt808-server/src/test/java/io/github/yezhihao/netmc/websocket/ChannelInitializerTest.java b/jtt808-server/src/test/java/io/github/yezhihao/netmc/websocket/ChannelInitializerTest.java new file mode 100644 index 0000000..c49bd3c --- /dev/null +++ b/jtt808-server/src/test/java/io/github/yezhihao/netmc/websocket/ChannelInitializerTest.java @@ -0,0 +1,7 @@ +package io.github.yezhihao.netmc.websocket; + +import static org.junit.jupiter.api.Assertions.*; + +class ChannelInitializerTest { + +} \ No newline at end of file