diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java index f6e2df3..25e188e 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java @@ -1,6 +1,7 @@ package cn.org.hentai.jtt1078.app.websocket; import cn.org.hentai.jtt1078.publisher.PublishManager; +import cn.org.hentai.jtt1078.util.BusinessConstant; import io.netty.channel.*; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; @@ -67,7 +68,6 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler public void channelActive(ChannelHandlerContext ctx) { channel = ctx.channel(); webSocketClientHandshaker.handshake(channel); - log.info("建立连接"); // 服务连接后, 启动定时发送数据的任务 if (reconnTimer != null) { @@ -76,7 +76,7 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler } sendMsgTimer = new Timer(); - log.info(">>>> start send msg to 808."); + log.info(BusinessConstant.LOGGER_PREFIX + " : jt808服务websocket连接成功, 开始发送当前推流通道"); sendMsgTimer.schedule(new TimerTask() { @Override public void run() { @@ -98,7 +98,7 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler */ @Override public void channelInactive(ChannelHandlerContext ctx) { - log.info("连接断开"); + log.info(BusinessConstant.LOGGER_PREFIX + " : jt808服务websocket连接断开"); // 掉线后, 启动定时任务, 尝试重连, 延迟1s, 间隔5s if (sendMsgTimer != null) { @@ -106,6 +106,7 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler sendMsgTimer.cancel(); } + log.info(BusinessConstant.LOGGER_PREFIX + " : jt808服务websocket开始重连任务"); reconnect(); } @@ -118,7 +119,7 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler WebsocketClient wsClient = WebsocketClient.getWsInstance(); wsClient.connect(); } catch (MyException e) { - log.info("reconnect failed! msg : " + e.getMessage()); + log.info(BusinessConstant.LOGGER_PREFIX + " : jt808服务websocket重连失败 : msg = ", e.getMessage()); } } }, 10, 5000); diff --git a/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java b/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java index e6618f2..b3ace41 100644 --- a/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java +++ b/src/main/java/cn/org/hentai/jtt1078/http/NettyHttpServerHandler.java @@ -50,6 +50,7 @@ public class NettyHttpServerHandler extends ChannelInboundHandlerAdapter { ctx.writeAndFlush(resp.getBytes()).await(); + logger.info(BusinessConstant.LOGGER_PREFIX + " : NettyHttpServerHandler : 订阅视频数据 "); // 订阅视频数据 long wid = PublishManager.getInstance().subscribe(tag, Media.Type.Video, ctx).getId(); setSession(ctx, new Session().set("subscriber-id", wid).set("tag", tag)); diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java index 289325f..60a75eb 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java @@ -1,13 +1,20 @@ package cn.org.hentai.jtt1078.publisher; +import cn.org.hentai.jtt1078.app.websocket.MyException; +import cn.org.hentai.jtt1078.app.websocket.WebsocketClient; import cn.org.hentai.jtt1078.entity.Media; import cn.org.hentai.jtt1078.subscriber.Subscriber; +import cn.org.hentai.jtt1078.util.BusinessConstant; import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * Created by houcheng on 2019-12-11. @@ -21,8 +28,11 @@ public final class PublishManager { } public Subscriber subscribe(String tag, Media.Type type, ChannelHandlerContext ctx) { + logger.info(BusinessConstant.LOGGER_PREFIX + " : PublishManager : subscribe "); + Channel chl = channels.get(tag); if (chl == null) { + logger.info(BusinessConstant.LOGGER_PREFIX + " : PublishManager : subscribe : 创建通道 {} ", chl.tag); chl = new Channel(tag); channels.put(tag, chl); } @@ -47,8 +57,7 @@ public final class PublishManager { } public Channel open(String tag) { - logger.info(">>>> channel : " + tag + " | open : " + System.currentTimeMillis()); - logger.info(">>>> all channel number. channels.values().size() = " + channels.values().size()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 通道开启 : {} ", tag); Channel chl = channels.get(tag); // 开启一个新的推流通道, 释放原来的通道 @@ -56,22 +65,32 @@ public final class PublishManager { channel.rtmpPublisher.close(); close(channel.tag); }); - - logger.info(">>>> after close, all channel number. channels.values().size() = " + channels.values().size()); + // 有新设备开启通道时, 清空下channels缓存 + channels.clear(); if (chl == null) { chl = new Channel(tag); channels.put(tag, chl); + + printCurChannels(); } if (chl.isPublishing()) throw new RuntimeException("channel already publishing"); return chl; } public void close(String tag) { + logger.info(BusinessConstant.LOGGER_PREFIX + " : 通道关闭 : {} ", tag); Channel chl = channels.remove(tag); if (chl != null) chl.close(); + + printCurChannels(); } + /** + * 取当前推流通道 + * + * @return + */ public String getCurPushDevice() { if (channels.size() == 0) { return ""; @@ -92,6 +111,21 @@ public final class PublishManager { } } + /** + * 打印当前所有通道 + */ + public void printCurChannels() { + List tags = new ArrayList<>(); + for (Channel value : channels.values()) { + tags.add(value.tag); + } + + logger.info(BusinessConstant.LOGGER_PREFIX + " : 当前通道列表 = {}", tags.toString()); + if (tags.size() > 1) { + logger.info(BusinessConstant.LOGGER_PREFIX + " : 当前通道列表数量错误, 应为1, 实际 = {}", tags.size()); + } + } + public void unsubscribe(String tag, long watcherId) { Channel chl = channels.get(tag); if (chl != null) chl.unsubscribe(watcherId); diff --git a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java index 56a3f1e..2ca631e 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java @@ -5,6 +5,7 @@ import cn.org.hentai.jtt1078.entity.MediaEncoding; import cn.org.hentai.jtt1078.publisher.Channel; import cn.org.hentai.jtt1078.publisher.PublishManager; import cn.org.hentai.jtt1078.entity.Audio; +import cn.org.hentai.jtt1078.util.BusinessConstant; import cn.org.hentai.jtt1078.util.Packet; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -34,11 +35,12 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { String sim = packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD() + packet.nextBCD(); int channel = packet.nextByte() & 0xff; String tag = sim + "-" + channel; + logger.debug(BusinessConstant.LOGGER_PREFIX + " : channelRead from tag : {} ", tag); if (SessionManager.contains(nettyChannel, "tag") == false) { Channel chl = PublishManager.getInstance().open(tag); SessionManager.set(nettyChannel, "tag", tag); - logger.info("start publishing: {} -> {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 开始发布视频 : {} -> {} ", Long.toHexString(chl.hashCode() & 0xffffffffL), tag); } Integer sequence = SessionManager.get(nettyChannel, "video-sequence"); @@ -75,6 +77,7 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { @Override public void channelInactive(ChannelHandlerContext ctx) { try { + logger.info(BusinessConstant.LOGGER_PREFIX + " : channelInactive : tag = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); super.channelInactive(ctx); release(ctx.channel()); } catch (Exception e) { @@ -84,6 +87,8 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.info(BusinessConstant.LOGGER_PREFIX + " : exceptionCaught : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); + // super.exceptionCaught(ctx, cause); cause.printStackTrace(); release(ctx.channel()); @@ -92,6 +97,8 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + logger.info(BusinessConstant.LOGGER_PREFIX + " : userEventTriggered : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); + if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { @@ -103,6 +110,7 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { } private void release(io.netty.channel.Channel channel) { + logger.info(BusinessConstant.LOGGER_PREFIX + " : release : channel = {} ", channel.id()); String tag = SessionManager.get(channel, "tag"); if (tag != null) { logger.info("close netty channel: {}", tag); diff --git a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java index 9ccde1b..6b7c4ce 100644 --- a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java +++ b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java @@ -32,19 +32,22 @@ public class RTMPPublisher extends Thread { tag, rtmpUrl ); - logger.info("Execute: {}", cmd); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令 : {} ", cmd); process = Runtime.getRuntime().exec(cmd); stderr = process.getErrorStream(); - logger.error("****** process result : " + stderr.toString()); + while ((len = stderr.read(buff)) > -1) { if (debugMode) System.out.print(new String(buff, 0, len)); } + logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令失败, 关闭通道[{}] : Process FFMPEG exited... ", tag); // 若ffmpeg命令执行失败, 视为推流失败, 将通道从缓存中删除 PublishManager.getInstance().close(tag); - logger.info("Process FFMPEG exited..."); } catch (Exception ex) { - logger.error("publish failed", ex); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布失败 : {} ", ex.getMessage()); + + logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布失败, 关闭通道[{}]", tag); + PublishManager.getInstance().close(tag); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/util/BusinessConstant.java b/src/main/java/cn/org/hentai/jtt1078/util/BusinessConstant.java new file mode 100644 index 0000000..1c8e2da --- /dev/null +++ b/src/main/java/cn/org/hentai/jtt1078/util/BusinessConstant.java @@ -0,0 +1,5 @@ +package cn.org.hentai.jtt1078.util; + +public interface BusinessConstant { + String LOGGER_PREFIX = " --- custom log --- "; +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index 14c1eed..e2b13e9 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -1,23 +1,32 @@ platform.name=hentai system.name=video-server log4j.rootLogger=ERROR, Console, stdout, file - #̨ log4j.appender.stdout.Target=System.out -log4j.appender.stdout.Threshold=DEBUG +log4j.appender.stdout.Threshold=INFO log4j.appender.Console=org.apache.log4j.ConsoleAppender log4j.appender.Console.layout=org.apache.log4j.PatternLayout -log4j.appender.Console.layout.ConversionPattern=[${platform.name}] %d{yyyy-MM-dd HH:mm:ss,SSS} [%-5p] [${system.name}] - %c{1} - %m%n -log4j.logger.cn.org.hentai=DEBUG - +#log4j.appender.Console.layout.ConversionPattern=[${platform.name}] %d{yyyy-MM-dd HH:mm:ss,SSS} [%-5p] [${system.name}] - %c{1} - %m%n +log4j.appender.Console.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %m%n +log4j.logger.cn.org.hentai=INFO #ļ log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=./applogs/jtt1078-video-server/video-server.log log4j.appender.file.Encoding=UTF-8 log4j.appender.file.name=fileLogDemo -log4j.appender.file.Threshold=DEBUG +log4j.appender.file.Threshold=INFO log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=[${platform.name}] %d{yyyy-MM-dd HH:mm:ss,SSS} [%-5p] [${system.name}] - %c{1} - %m%n +#log4j.appender.file.layout.ConversionPattern=[${platform.name}] %d{yyyy-MM-dd HH:mm:ss,SSS} [%-5p] [${system.name}] - %c{1} - %m%n +log4j.appender.file.layout.ConversionPattern=[%p] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %m%n log4j.appender.file.append=true +log4j.appender.DailyRollingFile = org.apache.log4j.DailyRollingFileAppender +#log4j.appender.RollingFile.MaxFileSize=1KB +#log4j.appender.RollingFile.MaxBackupIndex=3 +log4j.appender.DailyRollingFile.layout = org.apache.log4j.PatternLayout +log4j.appender.DailyRollingFile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n + # ÿһ־ļ -org.apache.log4j.DailyRollingFileAppender=true +#org.apache.log4j.DailyRollingFileAppender=true + + +