From aba72996bd39f920fe787c689d221bccc6f9d61e Mon Sep 17 00:00:00 2001 From: liuqingkun Date: Fri, 26 May 2023 21:01:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hentai/jtt1078/app/VideoServerApp.java | 15 +++++-- .../app/websocket/WebsocketClient.java | 4 +- .../app/websocket/WebsocketClientHandler.java | 2 +- .../jtt1078/http/NettyHttpServerHandler.java | 10 +++-- .../jtt1078/publisher/PublishManager.java | 34 ++++++-------- .../hentai/jtt1078/server/Jtt1078Handler.java | 20 ++++----- .../hentai/jtt1078/server/SessionManager.java | 13 ++++-- .../jtt1078/subscriber/RTMPPublisher.java | 7 ++- src/main/resources/log4j.properties | 44 ++++++------------- 9 files changed, 70 insertions(+), 79 deletions(-) diff --git a/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java b/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java index e302d38..949c729 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/VideoServerApp.java @@ -17,13 +17,14 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.misc.Signal; import sun.misc.SignalHandler; import java.net.InetAddress; -import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; /** * Created by matrixy on 2019/4/9. @@ -47,8 +48,14 @@ public class VideoServerApp { } }); - WebsocketClient wsClient = WebsocketClient.getWsInstance(); - wsClient.connect(); + new Thread(() -> { + WebsocketClient wsClient = WebsocketClient.getWsInstance(); + try { + wsClient.connect(); + } catch (MyException e) { + e.printStackTrace(); + } + }).start(); videoServer.start(); httpServer.start(); @@ -71,7 +78,7 @@ public class VideoServerApp { @Override protected void initChannel(final SocketChannel channel) throws Exception { ChannelPipeline p = channel.pipeline(); - // p.addLast(new IdleStateHandler(10,0,0, TimeUnit.SECONDS)); + p.addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)); p.addLast(new Jtt1078MessageDecoder()); // p.addLast(new Jtt808MessageEncoder()); // p.addLast(new JTT808Handler()); diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java index 7d86c07..9e5ac2f 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java @@ -4,6 +4,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.base64.Base64Encoder; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; @@ -13,6 +14,7 @@ import org.slf4j.LoggerFactory; import java.net.URI; import java.net.URISyntaxException; +import java.util.Base64; public class WebsocketClient extends AbstractWebsocketClient { @@ -36,7 +38,7 @@ public class WebsocketClient extends AbstractWebsocketClient { if (wsInstance == null) { try { wsInstance = new WebsocketClient("ws://42.192.165.208:19800/ws", 20); -// wsInstance = new WebsocketClient("ws://127.0.0.1:19800/ws", 20); +// wsInstance = new WebsocketClient("ws://127.0.0.1:19800/ws?" + Base64.getEncoder().encode("jtt1078".getBytes()), 20); } catch (URISyntaxException | MyException e) { log.error("发生异常,原因:{}", e.getMessage(), e); } diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java index 93388ea..27781f1 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java @@ -96,7 +96,7 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler // 每分钟的0s, 打印当前推流通道 LocalTime localTime = LocalTime.now(); if (localTime.getSecond() == 0) { - log.info(BusinessConstant.LOGGER_PREFIX + " : jt808当前推流通道 : tag = {}", tag); + log.info(BusinessConstant.LOGGER_PREFIX + " : websocket : jt808当前推流通道 : tag = {}", tag); } } catch (MyException e) { e.printStackTrace(); diff --git a/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java b/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java index b3ace41..3ec2a72 100644 --- a/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java +++ b/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java @@ -3,6 +3,7 @@ package cn.org.hentai.jtt1078.http; import cn.org.hentai.jtt1078.entity.Media; import cn.org.hentai.jtt1078.publisher.PublishManager; import cn.org.hentai.jtt1078.server.Session; +import cn.org.hentai.jtt1078.subscriber.Subscriber; import cn.org.hentai.jtt1078.util.*; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -50,10 +51,12 @@ public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter { ctx.writeAndFlush(resp.getBytes()).await(); - logger.info(BusinessConstant.LOGGER_PREFIX + " : NettyHttpServerHandler : 订阅视频数据 "); // 订阅视频数据 - long wid = PublishManager.getInstance().subscribe(tag, Media.Type.Video, ctx).getId(); - setSession(ctx, new Session().set("subscriber-id", wid).set("tag", tag)); + Subscriber subscriber = PublishManager.getInstance().subscribe(tag, Media.Type.Video, ctx); + if (subscriber != null) { + long wid = subscriber.getId(); + setSession(ctx, new Session().set("subscriber-id", wid).set("tag", tag)); + } } else if (uri.equals("/test/multimedia")) { responseHTMLFile("/multimedia.html", ctx); } else { @@ -66,6 +69,7 @@ public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter { ctx.flush(); } } catch (Exception e) { + e.printStackTrace(); logger.error("******************* " + e.getMessage()); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java index fd0cf78..df959bf 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java @@ -1,7 +1,5 @@ package cn.org.hentai.jtt1078.publisher; -import cn.org.hentai.jtt1078.app.websocket.MyException; -import cn.org.hentai.jtt1078.app.websocket.WebsocketClient; import cn.org.hentai.jtt1078.entity.Media; import cn.org.hentai.jtt1078.subscriber.Subscriber; import cn.org.hentai.jtt1078.util.BusinessConstant; @@ -11,10 +9,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; /** * Created by houcheng on 2019-12-11. @@ -28,13 +23,15 @@ public final class PublishManager { } public Subscriber subscribe(String tag, Media.Type type, ChannelHandlerContext ctx) { - logger.info(BusinessConstant.LOGGER_PREFIX + " : PublishManager : subscribe "); + logger.info(BusinessConstant.LOGGER_PREFIX + " : PublishManager : 订阅视频 tag = {}", tag); Channel chl = channels.get(tag); if (chl == null) { - logger.info(BusinessConstant.LOGGER_PREFIX + " : PublishManager : subscribe : 创建通道 {} ", chl.tag); - chl = new Channel(tag); - channels.put(tag, chl); + logger.info(BusinessConstant.LOGGER_PREFIX + " : PublishManager : 订阅视频失败 : 通道 {} 不存在", tag); + return null; +// logger.info(BusinessConstant.LOGGER_PREFIX + " : PublishManager : subscribe : 创建通道 {} ", tag); +// chl = new Channel(tag); +// channels.put(tag, chl); } Subscriber subscriber = null; if (type.equals(Media.Type.Video)) subscriber = chl.subscribe(ctx); @@ -58,31 +55,27 @@ public final class PublishManager { public Channel open(String tag) { // 开启一个新的推流通道, 先释放原来的通道 + logger.info(BusinessConstant.LOGGER_PREFIX + " : 通道开启 : tag = {} : 先调用close方法释放原来的通道", tag); channels.values().forEach(channel -> { - logger.info(BusinessConstant.LOGGER_PREFIX + " : 通道开启 : 调用close方法释放原来的通道"); close(channel.tag); }); + // 有新设备开启通道时, 清空下channels缓存 channels.clear(); - logger.info(BusinessConstant.LOGGER_PREFIX + " : 通道开启 : {} ", tag); - Channel chl = channels.get(tag); - if (chl == null) { - chl = new Channel(tag); - channels.put(tag, chl); + Channel chl = new Channel(tag); + channels.put(tag, chl); + + printCurChannels(); - printCurChannels(); - } if (chl.isPublishing()) throw new RuntimeException("channel already publishing"); return chl; } public void close(String tag) { - logger.info(BusinessConstant.LOGGER_PREFIX + " : 通道关闭 : {} ", tag); + logger.info(BusinessConstant.LOGGER_PREFIX + " : close方法 : 通道关闭 : {} ", tag); Channel chl = channels.remove(tag); if (chl != null) chl.close(); - - printCurChannels(); } /** @@ -120,6 +113,7 @@ public final class PublishManager { } logger.info(BusinessConstant.LOGGER_PREFIX + " : 当前通道列表 = {}", tags.toString()); + if (tags.size() > 1) { logger.info(BusinessConstant.LOGGER_PREFIX + " : 当前通道列表数量错误, 应为1, 实际 = {}", tags.size()); } diff --git a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java index 6ba542e..a118830 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java @@ -38,9 +38,10 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { logger.debug(BusinessConstant.LOGGER_PREFIX + " : channelRead from tag = {} : data = {} ", tag, packet.data.toString()); if (SessionManager.contains(nettyChannel, "tag") == false) { + logger.info(BusinessConstant.LOGGER_PREFIX + " : 新建会话 : channelId = {} ", nettyChannel.id().asLongText()); Channel chl = PublishManager.getInstance().open(tag); SessionManager.set(nettyChannel, "tag", tag); - logger.info(BusinessConstant.LOGGER_PREFIX + " : 开始发布视频 : {} -> {} ", Long.toHexString(chl.hashCode() & 0xffffffffL), tag); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 开始发布视频 : {} ", tag); } Integer sequence = SessionManager.get(nettyChannel, "video-sequence"); @@ -82,7 +83,7 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { @Override public void channelInactive(ChannelHandlerContext ctx) { try { - logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接断开触发 channelInactive : tag = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接断开触发 channelInactive : channelId = {} ", ctx.channel() == null ? "null" : ctx.channel().id().asLongText()); super.channelInactive(ctx); release(ctx.channel()); } catch (Exception e) { @@ -99,9 +100,8 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接异常触发 exceptionCaught : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接异常触发 exceptionCaught : channelId = {} ", ctx.channel() == null ? "null" : ctx.channel().id().asLongText()); - // super.exceptionCaught(ctx, cause); cause.printStackTrace(); release(ctx.channel()); ctx.close(); @@ -109,13 +109,11 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接超时 userEventTriggered : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); - if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { String tag = SessionManager.get(ctx.channel(), "tag"); - logger.info("read timeout: {}", tag); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接超时 userEventTriggered : channelId = {} : tag = {}", ctx.channel() == null ? "null" : ctx.channel().id().asLongText(), tag); release(ctx.channel()); } } @@ -127,16 +125,14 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { * @param channel */ private void release(io.netty.channel.Channel channel) { - logger.info(BusinessConstant.LOGGER_PREFIX + " : release : channel = {} ", channel.id()); SessionManager.printMapping(); + String tag = SessionManager.get(channel, "tag"); - logger.info(BusinessConstant.LOGGER_PREFIX + " : release : tag = {} ", tag); if (tag != null) { - logger.info("close netty channel: {}", tag); - logger.info(BusinessConstant.LOGGER_PREFIX + " : 关闭通道 : 调用close方法释放原来的通道"); + logger.info(BusinessConstant.LOGGER_PREFIX + " : release : channelId = {} : tag = {}, 调用close方法释放通道 ", channel.id(), tag); PublishManager.getInstance().close(tag); } - SessionManager.clearMapping(); + SessionManager.remove(channel); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java b/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java index 3c25c28..972db47 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java @@ -30,11 +30,16 @@ public final class SessionManager { } public static void printMapping() { - logger.info(BusinessConstant.LOGGER_PREFIX + " : SessionManager : mapping keys = {} ", mappings.keySet().toString()); - logger.info(BusinessConstant.LOGGER_PREFIX + " : SessionManager : mapping values = {} ", mappings.values().toString()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : SessionManager : mapping = {} ", mappings.toString()); } - public static void clearMapping() { - mappings.clear(); + public static void remove(Channel channel) { + if (mappings.containsKey(channel.id().asLongText() + "tag")) { + mappings.remove(channel.id().asLongText() + "tag"); + } + + if (mappings.containsKey(channel.id().asLongText() + "video-sequence")) { + mappings.remove(channel.id().asLongText() + "video-sequence"); + } } } \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java index 40413ed..9ad814e 100644 --- a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java +++ b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java @@ -43,20 +43,19 @@ public class RTMPPublisher extends Thread { if (debugMode) System.out.print(str); } - logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令失败, 关闭通道[{}] : Process FFMPEG exited... : error msg = {} ", tag, errMsg); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令失败, 关闭通道[{}], 并调用close方法释放原来的通道 : Process FFMPEG exited... : error msg = {} ", tag, errMsg); // 若ffmpeg命令执行失败, 视为推流失败, 将通道从缓存中删除 - logger.info(BusinessConstant.LOGGER_PREFIX + " : ffmpeg命令执行失败 : 调用close方法释放原来的通道"); PublishManager.getInstance().close(tag); } catch (Exception ex) { logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布异常 : {} ", ex.getMessage()); - logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布异常, 关闭通道[{}]", tag); - logger.info(BusinessConstant.LOGGER_PREFIX + " : ffmpeg命令异常 : 调用close方法释放原来的通道"); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布异常, 关闭通道[{}], 并调用close方法释放原来的通道", tag); PublishManager.getInstance().close(tag); } } public void close() { try { + logger.info(BusinessConstant.LOGGER_PREFIX + " : ffmpeg close : ffmpeg线程关闭. tag = {}", tag); if (process != null) process.destroyForcibly(); } catch (Exception e) { } diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index e2b13e9..e9c69ef 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -1,32 +1,16 @@ -platform.name=hentai -system.name=video-server -log4j.rootLogger=ERROR, Console, stdout, file -#̨ -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.Threshold=INFO -log4j.appender.Console=org.apache.log4j.ConsoleAppender -log4j.appender.Console.layout=org.apache.log4j.PatternLayout -#log4j.appender.Console.layout.ConversionPattern=[${platform.name}] %d{yyyy-MM-dd HH:mm:ss,SSS} [%-5p] [${system.name}] - %c{1} - %m%n -log4j.appender.Console.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %m%n -log4j.logger.cn.org.hentai=INFO -#ļ -log4j.appender.file=org.apache.log4j.FileAppender +log4j.rootLogger=INFO,stdout,file +#̨ +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{2}[%L] | %m%n + +#־ļ +log4j.appender.file=org.apache.log4j.DailyRollingFileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{2}[%L] | %m%n log4j.appender.file.File=./applogs/jtt1078-video-server/video-server.log -log4j.appender.file.Encoding=UTF-8 -log4j.appender.file.name=fileLogDemo +log4j.appender.file.DatePattern='_'yyyy-MM-dd'.log' log4j.appender.file.Threshold=INFO -log4j.appender.file.layout=org.apache.log4j.PatternLayout -#log4j.appender.file.layout.ConversionPattern=[${platform.name}] %d{yyyy-MM-dd HH:mm:ss,SSS} [%-5p] [${system.name}] - %c{1} - %m%n -log4j.appender.file.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %m%n -log4j.appender.file.append=true -log4j.appender.DailyRollingFile = org.apache.log4j.DailyRollingFileAppender -#log4j.appender.RollingFile.MaxFileSize=1KB -#log4j.appender.RollingFile.MaxBackupIndex=3 -log4j.appender.DailyRollingFile.layout = org.apache.log4j.PatternLayout -log4j.appender.DailyRollingFile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n - -# ÿһ־ļ -#org.apache.log4j.DailyRollingFileAppender=true - - - +log4j.appender.file.Append=true +log4j.appender.file.Encoding=UTF-8