From e307dc9c0cc0c5963deedfa8db35acd5013310bf Mon Sep 17 00:00:00 2001 From: liuqingkun Date: Wed, 17 May 2023 16:18:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../websocket/AbstractWebsocketClient.java | 3 +- .../app/websocket/WebsocketClientHandler.java | 21 +++---- .../jtt1078/publisher/PublishManager.java | 2 +- .../hentai/jtt1078/server/Jtt1078Handler.java | 1 + .../jtt1078/subscriber/RTMPPublisher.java | 14 +++-- .../cn/org/hentai/jtt1078/test/AudioTest.java | 61 ++++++++----------- .../hentai/jtt1078/util/BusinessConstant.java | 9 ++- src/main/resources/app.properties | 2 +- 8 files changed, 57 insertions(+), 56 deletions(-) diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java index bf0a85a..a8ddde7 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/AbstractWebsocketClient.java @@ -1,5 +1,6 @@ package cn.org.hentai.jtt1078.app.websocket; +import cn.org.hentai.jtt1078.util.BusinessConstant; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.apache.commons.lang.StringUtils; @@ -54,7 +55,7 @@ public abstract class AbstractWebsocketClient implements Closeable { doOpen(); doConnect(); } catch (Exception e) { - throw new MyException("连接没有成功打开,原因是:{}" + e.getMessage(), e); + log.error(BusinessConstant.LOGGER_PREFIX + " : wesocket连接没有成功打开,原因是:{}", e.getMessage(), e); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java index df55c7c..93388ea 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java @@ -9,13 +9,18 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.CharsetUtil; +import net.sf.json.JSON; +import net.sf.json.JSONObject; +import net.sf.json.JSONString; +import net.sf.json.util.JSONUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.LocalDate; import java.time.LocalTime; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; public class WebsocketClientHandler extends SimpleChannelInboundHandler { @@ -74,6 +79,7 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler // 服务连接后, 启动定时发送数据的任务 if (reconnTimer != null) { // 先关闭重连的定时任务 + log.info(BusinessConstant.LOGGER_PREFIX + " : jt808服务websocket连接成功, 关闭重连的定时任务"); reconnTimer.cancel(); } @@ -85,7 +91,7 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler String tag = PublishManager.getInstance().getCurPushDevice(); try { - WebsocketClient.getWsInstance().write(tag); + WebsocketClient.getWsInstance().write(BusinessConstant.WEBSOCKET_MSG_SOURCE_VALUE + ":" + tag); // 每分钟的0s, 打印当前推流通道 LocalTime localTime = LocalTime.now(); @@ -97,17 +103,6 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler } } }, 10, 1000); - -// printTimer = new Timer(); -// log.info(BusinessConstant.LOGGER_PREFIX + " : jt808服务websocket连接成功, 启动打印当前通道定时任务"); -// sendMsgTimer.schedule(new TimerTask() { -// @Override -// public void run() { -// String tag = PublishManager.getInstance().getCurPushDevice(); -// -// log.info(BusinessConstant.LOGGER_PREFIX + " : jt808当前推流通道 : tag = {}", tag); -// } -// }, 10, 60000); } /** diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java index 60a75eb..5fa5f23 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java @@ -62,7 +62,7 @@ public final class PublishManager { // 开启一个新的推流通道, 释放原来的通道 channels.values().forEach(channel -> { - channel.rtmpPublisher.close(); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 通道开启 : 调用close方法释放原来的通道"); close(channel.tag); }); // 有新设备开启通道时, 清空下channels缓存 diff --git a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java index fcc4448..b1e77e6 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java @@ -131,6 +131,7 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { logger.info(BusinessConstant.LOGGER_PREFIX + " : release : tag = {} ", tag); if (tag != null) { logger.info("close netty channel: {}", tag); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 关闭通道 : 调用close方法释放原来的通道"); PublishManager.getInstance().close(tag); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java index 6b7c4ce..9f434b7 100644 --- a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java +++ b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java @@ -36,17 +36,21 @@ public class RTMPPublisher extends Thread { process = Runtime.getRuntime().exec(cmd); stderr = process.getErrorStream(); + String errMsg = ""; while ((len = stderr.read(buff)) > -1) { - if (debugMode) System.out.print(new String(buff, 0, len)); + String str = new String(buff, 0, len); + errMsg += str; + if (debugMode) System.out.print(str); } - logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令失败, 关闭通道[{}] : Process FFMPEG exited... ", tag); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令失败, 关闭通道[{}] : Process FFMPEG exited... : error msg = {} ", tag, errMsg); // 若ffmpeg命令执行失败, 视为推流失败, 将通道从缓存中删除 + logger.info(BusinessConstant.LOGGER_PREFIX + " : ffmpeg命令执行失败 : 调用close方法释放原来的通道"); PublishManager.getInstance().close(tag); } catch (Exception ex) { - logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布失败 : {} ", ex.getMessage()); - - logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布失败, 关闭通道[{}]", tag); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布异常 : {} ", ex.getMessage()); + logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布异常, 关闭通道[{}]", tag); + logger.info(BusinessConstant.LOGGER_PREFIX + " : ffmpeg命令异常 : 调用close方法释放原来的通道"); PublishManager.getInstance().close(tag); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java b/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java index 21483c8..48e6dae 100644 --- a/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java +++ b/src/main/java/cn/org/hentai/jtt1078/test/AudioTest.java @@ -13,39 +13,32 @@ import java.io.FileOutputStream; */ public class AudioTest { public static void main(String[] args) throws Exception { - String s1 = new String(""); - String s2 = new String(""); - System.out.println(s1 == s2); - System.out.println(s1.equals(s2)); -// int len = -1; -// byte[] block = new byte[1024]; -// FileInputStream fis = new FileInputStream("e:\\test\\streaming.hex"); -// Jtt1078Decoder decoder = new Jtt1078Decoder(); -// G711Codec codec = new G711Codec(); -// FileOutputStream fos = new FileOutputStream("e:\\test\\fuckfuckfuck1111.pcm"); -// while ((len = fis.read(block)) > -1) -// { -// decoder.write(block, 0, len); -// while (true) -// { -// Packet p = decoder.decode(); -// if (p == null) break; -// -// int lengthOffset = 28; -// int dataType = (p.seek(15).nextByte() >> 4) & 0x0f; -// // 透传数据类型:0100,没有后面的时间以及Last I Frame Interval和Last Frame Interval字段 -// if (dataType == 0x04) lengthOffset = 28 - 8 - 2 - 2; -// else if (dataType == 0x03) lengthOffset = 28 - 4; -// -// if (dataType == 0x03) -// { -// byte[] pcmData = codec.toPCM(p.seek(lengthOffset + 2).nextBytes()); -// fos.write(pcmData); -// fos.flush(); -// } -// } -// } -// fos.close(); -// fis.close(); + int len = -1; + byte[] block = new byte[1024]; + FileInputStream fis = new FileInputStream("e:\\test\\streaming.hex"); + Jtt1078Decoder decoder = new Jtt1078Decoder(); + G711Codec codec = new G711Codec(); + FileOutputStream fos = new FileOutputStream("e:\\test\\fuckfuckfuck1111.pcm"); + while ((len = fis.read(block)) > -1) { + decoder.write(block, 0, len); + while (true) { + Packet p = decoder.decode(); + if (p == null) break; + + int lengthOffset = 28; + int dataType = (p.seek(15).nextByte() >> 4) & 0x0f; + // 透传数据类型:0100,没有后面的时间以及Last I Frame Interval和Last Frame Interval字段 + if (dataType == 0x04) lengthOffset = 28 - 8 - 2 - 2; + else if (dataType == 0x03) lengthOffset = 28 - 4; + + if (dataType == 0x03) { + byte[] pcmData = codec.toPCM(p.seek(lengthOffset + 2).nextBytes()); + fos.write(pcmData); + fos.flush(); + } + } + } + fos.close(); + fis.close(); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/util/BusinessConstant.java b/src/main/java/cn/org/hentai/jtt1078/util/BusinessConstant.java index 1c8e2da..9525291 100644 --- a/src/main/java/cn/org/hentai/jtt1078/util/BusinessConstant.java +++ b/src/main/java/cn/org/hentai/jtt1078/util/BusinessConstant.java @@ -1,5 +1,12 @@ package cn.org.hentai.jtt1078.util; +import java.util.Map; + public interface BusinessConstant { - String LOGGER_PREFIX = " --- custom log --- "; + String LOGGER_PREFIX = " --- custom log --- "; + + /** + * 发送给jt808的信息类型 + */ + String WEBSOCKET_MSG_SOURCE_VALUE = "jtt1078"; } diff --git a/src/main/resources/app.properties b/src/main/resources/app.properties index a559c15..6ef4ed1 100644 --- a/src/main/resources/app.properties +++ b/src/main/resources/app.properties @@ -20,4 +20,4 @@ rtmp.url = rtmp://127.0.0.1:1935/live #rtmp.url = rtmp://111583.livepush.myqcloud.com/trtc_1400439699/live_2042508984163242532?txSecret=78986d90bfaec1030442f0f1135646c9&txTime=642696EF # 设置为on时,控制台将输出ffmpeg的输出 -debug.mode = off \ No newline at end of file +debug.mode = on \ No newline at end of file