diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java index b2f10d6..7d86c07 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java @@ -35,7 +35,8 @@ public class WebsocketClient extends AbstractWebsocketClient { public static WebsocketClient getWsInstance() { if (wsInstance == null) { try { - wsInstance = new WebsocketClient("ws://localhost:19800/ws", 20); + wsInstance = new WebsocketClient("ws://42.192.165.208:19800/ws", 20); +// wsInstance = new WebsocketClient("ws://127.0.0.1:19800/ws", 20); } catch (URISyntaxException | MyException e) { log.error("发生异常,原因:{}", e.getMessage(), e); } diff --git a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java index 003ab80..f6e2df3 100644 --- a/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java +++ b/src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java @@ -11,7 +11,6 @@ import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.xml.soap.Text; import java.util.Timer; import java.util.TimerTask; @@ -77,11 +76,12 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler } sendMsgTimer = new Timer(); + log.info(">>>> start send msg to 808."); sendMsgTimer.schedule(new TimerTask() { @Override public void run() { String clientId = PublishManager.getInstance().getCurPushDevice(); - log.info("send msg to 808. clientId : " + clientId); + try { WebsocketClient.getWsInstance().write(clientId); } catch (MyException e) { @@ -117,7 +117,6 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler try { WebsocketClient wsClient = WebsocketClient.getWsInstance(); wsClient.connect(); - log.info("reconnect success!"); } catch (MyException e) { log.info("reconnect failed! msg : " + e.getMessage()); } @@ -143,7 +142,6 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } else if (msg instanceof TextWebSocketFrame) { TextWebSocketFrame response = (TextWebSocketFrame) msg; - log.info("receive msg : " + response.text()); } this.handleWebSocketFrame(msg); } diff --git a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java index 90f8499..289325f 100644 --- a/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java +++ b/src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java @@ -6,6 +6,7 @@ import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** @@ -46,9 +47,18 @@ public final class PublishManager { } public Channel open(String tag) { - logger.info("****** channel open : " + System.currentTimeMillis()); + logger.info(">>>> channel : " + tag + " | open : " + System.currentTimeMillis()); + logger.info(">>>> all channel number. channels.values().size() = " + channels.values().size()); Channel chl = channels.get(tag); + // 开启一个新的推流通道, 释放原来的通道 + channels.values().forEach(channel -> { + channel.rtmpPublisher.close(); + close(channel.tag); + }); + + logger.info(">>>> after close, all channel number. channels.values().size() = " + channels.values().size()); + if (chl == null) { chl = new Channel(tag); channels.put(tag, chl); @@ -66,8 +76,20 @@ public final class PublishManager { if (channels.size() == 0) { return ""; } - Channel chl = channels.get(0); - return chl.tag; + + Channel chl = null; + for (Channel value : channels.values()) { + chl = value; + if (chl != null) { + break; + } + } + + if (chl != null) { + return chl.tag; + } else { + return ""; + } } public void unsubscribe(String tag, long watcherId) { diff --git a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java index 5ecd319..56a3f1e 100644 --- a/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java +++ b/src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java @@ -36,8 +36,6 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler { String tag = sim + "-" + channel; if (SessionManager.contains(nettyChannel, "tag") == false) { - PublishManager.getInstance().close(tag); - logger.info("******** close channel, tag : " + tag); Channel chl = PublishManager.getInstance().open(tag); SessionManager.set(nettyChannel, "tag", tag); logger.info("start publishing: {} -> {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel); diff --git a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java index bbde0d1..9ccde1b 100644 --- a/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java +++ b/src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java @@ -1,20 +1,11 @@ package cn.org.hentai.jtt1078.subscriber; -import cn.org.hentai.jtt1078.codec.MP3Encoder; -import cn.org.hentai.jtt1078.flv.AudioTag; -import cn.org.hentai.jtt1078.flv.FlvAudioTagEncoder; -import cn.org.hentai.jtt1078.flv.FlvEncoder; -import cn.org.hentai.jtt1078.publisher.Channel; import cn.org.hentai.jtt1078.publisher.PublishManager; import cn.org.hentai.jtt1078.util.*; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; -import java.io.OutputStream; -import java.util.LinkedList; public class RTMPPublisher extends Thread { static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class);