From dd4017833b0cfb9769cfcce992a9b44919152d5a Mon Sep 17 00:00:00 2001 From: liuqingkun Date: Wed, 31 May 2023 18:07:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96,=20?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/hentai/jtt1078/publisher/Channel.java | 4 +-- .../jtt1078/publisher/PublishManager.java | 6 ++++- .../hentai/jtt1078/server/Jtt1078Handler.java | 6 ++--- .../hentai/jtt1078/server/SessionManager.java | 3 +++ .../jtt1078/subscriber/RTMPPublisher.java | 26 +++++++++++++++++-- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java b/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java index cf72976..e529ef2 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/Channel.java @@ -106,13 +106,13 @@ public class Channel { } } - public void close() { + public void close(Boolean ffmpegCloseFlag) { for (Iterator itr = subscribers.iterator(); itr.hasNext(); ) { Subscriber subscriber = itr.next(); subscriber.close(); itr.remove(); } - if (rtmpPublisher != null) rtmpPublisher.close(); + if (rtmpPublisher != null) rtmpPublisher.close(ffmpegCloseFlag); } private byte[] readNalu() { diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java index df959bf..a1b9eec 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java @@ -1,6 +1,7 @@ package cn.org.hentai.jtt1078.publisher; import cn.org.hentai.jtt1078.entity.Media; +import cn.org.hentai.jtt1078.server.SessionManager; import cn.org.hentai.jtt1078.subscriber.Subscriber; import cn.org.hentai.jtt1078.util.BusinessConstant; import io.netty.channel.ChannelHandlerContext; @@ -17,6 +18,7 @@ import java.util.concurrent.ConcurrentHashMap; public final class PublishManager { static Logger logger = LoggerFactory.getLogger(PublishManager.class); ConcurrentHashMap channels; + ConcurrentHashMap ffmpegFlag; private PublishManager() { channels = new ConcurrentHashMap(); @@ -62,6 +64,8 @@ public final class PublishManager { // 有新设备开启通道时, 清空下channels缓存 channels.clear(); + // 同时需要将SessionManager中对应缓存删掉 + SessionManager.clear(); Channel chl = new Channel(tag); channels.put(tag, chl); @@ -75,7 +79,7 @@ public final class PublishManager { public void close(String tag) { logger.info(BusinessConstant.LOGGER_PREFIX + " : close方法 : 通道关闭 : {} ", tag); Channel chl = channels.remove(tag); - if (chl != null) chl.close(); + if (chl != null) chl.close(Boolean.TRUE); } /** diff --git a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java index 4694c3e..6c70fec 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java @@ -129,14 +129,12 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { * @param channel */ private void release(io.netty.channel.Channel channel) { - SessionManager.printMapping(); - String tag = SessionManager.get(channel, "tag"); if (tag != null) { + SessionManager.printMapping(); logger.info(BusinessConstant.LOGGER_PREFIX + " : release : channelId = {} : tag = {}, 调用close方法释放通道 ", channel.id(), tag); PublishManager.getInstance().close(tag); + SessionManager.remove(channel); } - - SessionManager.remove(channel); } } diff --git a/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java b/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java index 972db47..0c4e0ca 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/SessionManager.java @@ -42,4 +42,7 @@ public final class SessionManager { mappings.remove(channel.id().asLongText() + "video-sequence"); } } + public static void clear() { + mappings.clear(); + } } \ No newline at end of file diff --git a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java index 9ad814e..73ce326 100644 --- a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java +++ b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java @@ -1,6 +1,7 @@ package cn.org.hentai.jtt1078.subscriber; import cn.org.hentai.jtt1078.publisher.PublishManager; +import cn.org.hentai.jtt1078.server.SessionManager; import cn.org.hentai.jtt1078.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,6 +12,8 @@ public class RTMPPublisher extends Thread { static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class); String tag = null; + // 主动退出标志 + boolean closeFlag = false; Process process = null; public RTMPPublisher(String tag) { @@ -34,6 +37,14 @@ public class RTMPPublisher extends Thread { ); logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令 : {} ", cmd); process = Runtime.getRuntime().exec(cmd); + + if (!closeFlag) { + // 若ffmpeg主动退出标志为否, 重新执行一次ffmpeg命令 + logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令失败 - 进行重试 : {} ", cmd); + closeFlag = Boolean.FALSE; + process = Runtime.getRuntime().exec(cmd); + } + stderr = process.getErrorStream(); String errMsg = ""; @@ -44,12 +55,14 @@ public class RTMPPublisher extends Thread { } logger.info(BusinessConstant.LOGGER_PREFIX + " : 执行ffmpeg命令失败, 关闭通道[{}], 并调用close方法释放原来的通道 : Process FFMPEG exited... : error msg = {} ", tag, errMsg); - // 若ffmpeg命令执行失败, 视为推流失败, 将通道从缓存中删除 - PublishManager.getInstance().close(tag); } catch (Exception ex) { logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布异常 : {} ", ex.getMessage()); logger.info(BusinessConstant.LOGGER_PREFIX + " : 使用ffmpeg发布异常, 关闭通道[{}], 并调用close方法释放原来的通道", tag); + } finally { + // 若ffmpeg命令执行失败, 视为推流失败, 将通道从缓存中删除 PublishManager.getInstance().close(tag); + // 同时需要将SessionManager中对应缓存删掉 + SessionManager.clear(); } } @@ -60,4 +73,13 @@ public class RTMPPublisher extends Thread { } catch (Exception e) { } } + + public void close(Boolean flag) { + try { + closeFlag = flag; + logger.info(BusinessConstant.LOGGER_PREFIX + " : ffmpeg close : ffmpeg线程关闭. tag = {}, closeFlag = {}", tag, flag); + if (process != null) process.destroyForcibly(); + } catch (Exception e) { + } + } } \ No newline at end of file