添加主动掉线日志

master
liuqingkun 3 years ago
parent ee0220a0eb
commit c824a82428
  1. 3
      src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClient.java
  2. 6
      src/main/java/cn/org/hentai/jtt1078/app/websocket/WebsocketClientHandler.java
  3. 28
      src/main/java/cn/org/hentai/jtt1078/publisher/PublishManager.java
  4. 2
      src/main/java/cn/org/hentai/jtt1078/server/Jtt1078Handler.java
  5. 9
      src/main/java/cn/org/hentai/jtt1078/subscriber/RTMPPublisher.java

@ -35,7 +35,8 @@ public class WebsocketClient extends AbstractWebsocketClient {
public static WebsocketClient getWsInstance() { public static WebsocketClient getWsInstance() {
if (wsInstance == null) { if (wsInstance == null) {
try { try {
wsInstance = new WebsocketClient("ws://localhost:19800/ws", 20); wsInstance = new WebsocketClient("ws://42.192.165.208:19800/ws", 20);
// wsInstance = new WebsocketClient("ws://127.0.0.1:19800/ws", 20);
} catch (URISyntaxException | MyException e) { } catch (URISyntaxException | MyException e) {
log.error("发生异常,原因:{}", e.getMessage(), e); log.error("发生异常,原因:{}", e.getMessage(), e);
} }

@ -11,7 +11,6 @@ import io.netty.util.CharsetUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.xml.soap.Text;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
@ -77,11 +76,12 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler<Object>
} }
sendMsgTimer = new Timer(); sendMsgTimer = new Timer();
log.info(">>>> start send msg to 808.");
sendMsgTimer.schedule(new TimerTask() { sendMsgTimer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
String clientId = PublishManager.getInstance().getCurPushDevice(); String clientId = PublishManager.getInstance().getCurPushDevice();
log.info("send msg to 808. clientId : " + clientId);
try { try {
WebsocketClient.getWsInstance().write(clientId); WebsocketClient.getWsInstance().write(clientId);
} catch (MyException e) { } catch (MyException e) {
@ -117,7 +117,6 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler<Object>
try { try {
WebsocketClient wsClient = WebsocketClient.getWsInstance(); WebsocketClient wsClient = WebsocketClient.getWsInstance();
wsClient.connect(); wsClient.connect();
log.info("reconnect success!");
} catch (MyException e) { } catch (MyException e) {
log.info("reconnect failed! msg : " + e.getMessage()); log.info("reconnect failed! msg : " + e.getMessage());
} }
@ -143,7 +142,6 @@ public class WebsocketClientHandler extends SimpleChannelInboundHandler<Object>
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else if (msg instanceof TextWebSocketFrame) { } else if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame response = (TextWebSocketFrame) msg; TextWebSocketFrame response = (TextWebSocketFrame) msg;
log.info("receive msg : " + response.text());
} }
this.handleWebSocketFrame(msg); this.handleWebSocketFrame(msg);
} }

@ -6,6 +6,7 @@ import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -46,9 +47,18 @@ public final class PublishManager {
} }
public Channel open(String tag) { public Channel open(String tag) {
logger.info("****** channel open : " + System.currentTimeMillis()); logger.info(">>>> channel : " + tag + " | open : " + System.currentTimeMillis());
logger.info(">>>> all channel number. channels.values().size() = " + channels.values().size());
Channel chl = channels.get(tag); Channel chl = channels.get(tag);
// 开启一个新的推流通道, 释放原来的通道
channels.values().forEach(channel -> {
channel.rtmpPublisher.close();
close(channel.tag);
});
logger.info(">>>> after close, all channel number. channels.values().size() = " + channels.values().size());
if (chl == null) { if (chl == null) {
chl = new Channel(tag); chl = new Channel(tag);
channels.put(tag, chl); channels.put(tag, chl);
@ -66,8 +76,20 @@ public final class PublishManager {
if (channels.size() == 0) { if (channels.size() == 0) {
return ""; return "";
} }
Channel chl = channels.get(0);
return chl.tag; Channel chl = null;
for (Channel value : channels.values()) {
chl = value;
if (chl != null) {
break;
}
}
if (chl != null) {
return chl.tag;
} else {
return "";
}
} }
public void unsubscribe(String tag, long watcherId) { public void unsubscribe(String tag, long watcherId) {

@ -36,8 +36,6 @@ public class Jtt1078Handler extends SimpleChannelInboundHandler<Packet> {
String tag = sim + "-" + channel; String tag = sim + "-" + channel;
if (SessionManager.contains(nettyChannel, "tag") == false) { if (SessionManager.contains(nettyChannel, "tag") == false) {
PublishManager.getInstance().close(tag);
logger.info("******** close channel, tag : " + tag);
Channel chl = PublishManager.getInstance().open(tag); Channel chl = PublishManager.getInstance().open(tag);
SessionManager.set(nettyChannel, "tag", tag); SessionManager.set(nettyChannel, "tag", tag);
logger.info("start publishing: {} -> {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel); logger.info("start publishing: {} -> {}-{}", Long.toHexString(chl.hashCode() & 0xffffffffL), sim, channel);

@ -1,20 +1,11 @@
package cn.org.hentai.jtt1078.subscriber; package cn.org.hentai.jtt1078.subscriber;
import cn.org.hentai.jtt1078.codec.MP3Encoder;
import cn.org.hentai.jtt1078.flv.AudioTag;
import cn.org.hentai.jtt1078.flv.FlvAudioTagEncoder;
import cn.org.hentai.jtt1078.flv.FlvEncoder;
import cn.org.hentai.jtt1078.publisher.Channel;
import cn.org.hentai.jtt1078.publisher.PublishManager; import cn.org.hentai.jtt1078.publisher.PublishManager;
import cn.org.hentai.jtt1078.util.*; import cn.org.hentai.jtt1078.util.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
public class RTMPPublisher extends Thread { public class RTMPPublisher extends Thread {
static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class); static Logger logger = LoggerFactory.getLogger(RTMPPublisher.class);

Loading…
Cancel
Save