diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java index 25e188e..df55c7c 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java @@ -12,6 +12,8 @@ import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.LocalDate; +import java.time.LocalTime; import java.util.Timer; import java.util.TimerTask; @@ -80,15 +82,32 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler sendMsgTimer.schedule(new TimerTask() { @Override public void run() { - String clientId = PublishManager.getInstance().getCurPushDevice(); + String tag = PublishManager.getInstance().getCurPushDevice(); try { - WebsocketClient.getWsInstance().write(clientId); + WebsocketClient.getWsInstance().write(tag); + + // 每分钟的0s, 打印当前推流通道 + LocalTime localTime = LocalTime.now(); + if (localTime.getSecond() == 0) { + log.info(BusinessConstant.LOGGER_PREFIX + " : jt808当前推流通道 : tag = {}", tag); + } } catch (MyException e) { e.printStackTrace(); } } }, 10, 1000); + +// printTimer = new Timer(); +// log.info(BusinessConstant.LOGGER_PREFIX + " : jt808服务websocket连接成功, 启动打印当前通道定时任务"); +// sendMsgTimer.schedule(new TimerTask() { +// @Override +// public void run() { +// String tag = PublishManager.getInstance().getCurPushDevice(); +// +// log.info(BusinessConstant.LOGGER_PREFIX + " : jt808当前推流通道 : tag = {}", tag); +// } +// }, 10, 60000); } /** diff --git a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java index 2ca631e..fcc4448 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java @@ -74,10 +74,14 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { } } + /** + * 连接断开触发 + * @param ctx + */ @Override public void channelInactive(ChannelHandlerContext ctx) { try { - logger.info(BusinessConstant.LOGGER_PREFIX + " : channelInactive : tag = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接断开触发 channelInactive : tag = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); super.channelInactive(ctx); release(ctx.channel()); } catch (Exception e) { @@ -85,9 +89,15 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { } } + /** + * 连接异常触发 + * @param ctx + * @param cause + * @throws Exception + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.info(BusinessConstant.LOGGER_PREFIX + " : exceptionCaught : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接异常触发 exceptionCaught : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); // super.exceptionCaught(ctx, cause); cause.printStackTrace(); @@ -97,7 +107,7 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - logger.info(BusinessConstant.LOGGER_PREFIX + " : userEventTriggered : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 连接超时 userEventTriggered : channel = {} ", ctx.channel() == null ? "null" : ctx.channel().id()); if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) { IdleStateEvent event = (IdleStateEvent) evt; @@ -109,9 +119,16 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { } } + /** + * 释放通道 + * + * @param channel + */ private void release(io.netty.channel.Channel channel) { logger.info(BusinessConstant.LOGGER_PREFIX + " : release : channel = {} ", channel.id()); + SessionManager.printMapping(); String tag = SessionManager.get(channel, "tag"); + logger.info(BusinessConstant.LOGGER_PREFIX + " : release : tag = {} ", tag); if (tag != null) { logger.info("close netty channel: {}", tag); PublishManager.getInstance().close(tag); diff --git a/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java b/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java index fa717b0..988354d 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java @@ -1,31 +1,36 @@ package cn.org.hentai.jtt1078.server; +import cn.org.hentai.jtt1078.util.BusinessConstant; import io.netty.channel.Channel; import org.apache.commons.collections.map.HashedMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; -public final class SessionManager -{ +public final class SessionManager { + static Logger logger = LoggerFactory.getLogger(SessionManager.class); + private static final Map mappings = new HashedMap(); - public static void init() - { + public static void init() { // ... } - public static T get(Channel channel, String key) - { + public static T get(Channel channel, String key) { return (T) mappings.get(channel.id().asLongText() + key); } - public static void set(Channel channel, String key, Object value) - { + public static void set(Channel channel, String key, Object value) { mappings.put(channel.id().asLongText() + key, value); } - public static boolean contains(Channel channel, String key) - { + public static boolean contains(Channel channel, String key) { return mappings.containsKey(channel.id().asLongText() + key); } + + public static void printMapping() { + logger.info(BusinessConstant.LOGGER_PREFIX + " : SessionManager : mapping keys = {} ", mappings.keySet().toString()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : SessionManager : mapping values = {} ", mappings.values().toString()); + } } \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java b/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java index 831e483..21483c8 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java @@ -11,39 +11,41 @@ import java.io.FileOutputStream; /** * Created by matrixy on 2019/12/21. */ -public class AudioTest -{ - public static void main(String[] args) throws Exception - { - int len = -1; - byte[] block = new byte[1024]; - FileInputStream fis = new FileInputStream("e:\\test\\streaming.hex"); - Jtt1078Decoder decoder = new Jtt1078Decoder(); - G711Codec codec = new G711Codec(); - FileOutputStream fos = new FileOutputStream("e:\\test\\fuckfuckfuck1111.pcm"); - while ((len = fis.read(block)) > -1) - { - decoder.write(block, 0, len); - while (true) - { - Packet p = decoder.decode(); - if (p == null) break; - - int lengthOffset = 28; - int dataType = (p.seek(15).nextByte() >> 4) & 0x0f; - // 透传数据类型:0100,没有后面的时间以及Last I Frame Interval和Last Frame Interval字段 - if (dataType == 0x04) lengthOffset = 28 - 8 - 2 - 2; - else if (dataType == 0x03) lengthOffset = 28 - 4; - - if (dataType == 0x03) - { - byte[] pcmData = codec.toPCM(p.seek(lengthOffset + 2).nextBytes()); - fos.write(pcmData); - fos.flush(); - } - } - } - fos.close(); - fis.close(); +public class AudioTest { + public static void main(String[] args) throws Exception { + String s1 = new String(""); + String s2 = new String(""); + System.out.println(s1 == s2); + System.out.println(s1.equals(s2)); +// int len = -1; +// byte[] block = new byte[1024]; +// FileInputStream fis = new FileInputStream("e:\\test\\streaming.hex"); +// Jtt1078Decoder decoder = new Jtt1078Decoder(); +// G711Codec codec = new G711Codec(); +// FileOutputStream fos = new FileOutputStream("e:\\test\\fuckfuckfuck1111.pcm"); +// while ((len = fis.read(block)) > -1) +// { +// decoder.write(block, 0, len); +// while (true) +// { +// Packet p = decoder.decode(); +// if (p == null) break; +// +// int lengthOffset = 28; +// int dataType = (p.seek(15).nextByte() >> 4) & 0x0f; +// // 透传数据类型:0100,没有后面的时间以及Last I Frame Interval和Last Frame Interval字段 +// if (dataType == 0x04) lengthOffset = 28 - 8 - 2 - 2; +// else if (dataType == 0x03) lengthOffset = 28 - 4; +// +// if (dataType == 0x03) +// { +// byte[] pcmData = codec.toPCM(p.seek(lengthOffset + 2).nextBytes()); +// fos.write(pcmData); +// fos.flush(); +// } +// } +// } +// fos.close(); +// fis.close(); } }