代码优化, 日志更新

master
liuqingkun 3 years ago
parent fe5c63f706
commit 20bd8e9383
  1. 9
      jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java
  2. 7
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java
  3. 47
      jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java
  4. 30
      jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java
  5. 89
      jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java

@ -10,6 +10,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.netty.util.internal.StringUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.yzh.commons.util.BusinessCacheUtil; import org.yzh.commons.util.BusinessCacheUtil;
@ -70,9 +71,11 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter {
log.warn(">>>>> Disconnected clientId = {} , address = {}", clientId, client(ctx)); log.warn(">>>>> Disconnected clientId = {} , address = {}", clientId, client(ctx));
// 若掉线设备为当前推流设备, 调用推流方法, 发送推流 // 若掉线设备为当前推流设备, 或无推流设备, 但当前有在线设备, 调用推流方法, 发送推流
if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice()) && SendCommandUtil.getService() != null) { if ((clientId.equals(BusinessCacheUtil.getCurrentPushDevice())
SendCommandUtil.getService().sendToDevice(); || (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) && sessionManager.all().size() > 0))
&& SendCommandUtil.getService() != null) {
SendCommandUtil.getService().sendToDevice(null, null);
} }
} }

@ -86,13 +86,14 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
} }
} }
// 若接收到1078推送的当前推流设备为空, 触发自动推流方法
// int emptyTimes = BusinessCacheUtil.getPushEmptyTimes(); // int emptyTimes = BusinessCacheUtil.getPushEmptyTimes();
// // 若持续30s收到的消息都为空, 调用推流方法 // // 若持续30s收到的消息都为空, 调用推流方法
// if (emptyTimes == 30) { // if (emptyTimes == 30) {
// log.warn(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 持续30s收到的消息都为空, 调用推流方法 "); // log.warn(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 持续30s收到的消息都为空, 调用推流方法 ");
// if (SendCommandUtil.getService() != null) { if (SendCommandUtil.getService() != null) {
// SendCommandUtil.getService().sendToDevice(); SendCommandUtil.getService().sendToDevice(null, null);
// } }
// emptyTimes = 0; // emptyTimes = 0;
// } else { // } else {
// emptyTimes++; // emptyTimes++;

@ -46,9 +46,29 @@ public class JT1078Controller {
return messageManager.requestR(request.messageId(JT1078.查询终端音视频属性), T1003.class); return messageManager.requestR(request.messageId(JT1078.查询终端音视频属性), T1003.class);
} }
@Operation(summary = "9101 实时音视频传输请求") @Operation(summary = "9101 实时音视频传输请求")
@PostMapping("9101") @PostMapping("9101")
public Mono<APIResult<T0001>> T9101(@RequestBody T9101 request) { public Mono<APIResult<T0001>> T9101(@RequestBody T9101 request) {
return messageManager.requestR(request, T0001.class);
}
@Operation(summary = "9102 音视频实时传输控制")
@PostMapping("9102")
public Mono<APIResult<T0001>> T9102(@RequestBody T9102 request) {
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
return resultMono;
}
/**
* 自定义推流方法, 先停止原来的视频刘
*
* @param request
* @return
*/
@Operation(summary = "9101 实时音视频传输请求-自定义")
@PostMapping("9101-Custom")
public Mono<APIResult<T0001>> T9101Custom(@RequestBody T9101 request) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 9101 实时音视频传输请求 开始执行."); log.warn(BusinessConstant.LOGGER_PREFIX + " : 9101 实时音视频传输请求 开始执行.");
Mono<APIResult<T0001>> resultMono = null; Mono<APIResult<T0001>> resultMono = null;
@ -93,9 +113,15 @@ public class JT1078Controller {
return resultMono; return resultMono;
} }
@Operation(summary = "9102 音视频实时传输控制") /**
@PostMapping("9102") * 自定义推流方法, 若停止推流设备正在推流, 触发自动推流方法
public Mono<APIResult<T0001>> T9102(@RequestBody T9102 request) { *
* @param request
* @return
*/
@Operation(summary = "9102 音视频实时传输控制-自定义")
@PostMapping("9102-Custom")
public Mono<APIResult<T0001>> T9102Custom(@RequestBody T9102 request) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step1 : 停止推流 "); log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step1 : 停止推流 ");
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class); Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
@ -103,21 +129,8 @@ public class JT1078Controller {
log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step2 : 当前推流设备 = {} - {} " + BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel()); log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step2 : 当前推流设备 = {} - {} " + BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel());
if (request.getClientId().equals(BusinessCacheUtil.getCurrentPushDevice())) { if (request.getClientId().equals(BusinessCacheUtil.getCurrentPushDevice())) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step3 : 调用推流方法 "); log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step3 : 调用推流方法 ");
sendCommandService.sendToDevice(); sendCommandService.sendToDevice(null, null);
}
return resultMono;
} }
@Operation(summary = "9101 实时音视频传输请求")
@PostMapping("9101-v2")
public Mono<APIResult<T0001>> T9101V2(@RequestBody T9101 request) {
return messageManager.requestR(request, T0001.class);
}
@Operation(summary = "9102 音视频实时传输控制")
@PostMapping("9102-v2")
public Mono<APIResult<T0001>> T9102V2(@RequestBody T9102 request) {
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
return resultMono; return resultMono;
} }

@ -74,14 +74,25 @@ public class JT808Endpoint {
device.setPlateNo(message.getPlateNo()); device.setPlateNo(message.getPlateNo());
session.setAttribute(SessionKey.Device, device); session.setAttribute(SessionKey.Device, device);
// 若无正在推流的设备, 调用推流方法
SendCommandUtil.setService(sendCommandService); SendCommandUtil.setService(sendCommandService);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 若无正在推流的设备, 调用推流方法
if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) { if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId());
sendCommandService.sendToDevice(message.getClientId(), null); sendCommandService.sendToDevice(null, null);
} else { } else {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice());
} }
}
}).start();
T8100 result = new T8100(); T8100 result = new T8100();
result.setResponseSerialNo(message.getSerialNo()); result.setResponseSerialNo(message.getSerialNo());
@ -102,14 +113,25 @@ public class JT808Endpoint {
device.setPlateNo(token[1]); device.setPlateNo(token[1]);
session.setAttribute(SessionKey.Device, device); session.setAttribute(SessionKey.Device, device);
// 若无正在推流的设备, 调用推流方法
SendCommandUtil.setService(sendCommandService); SendCommandUtil.setService(sendCommandService);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 若无正在推流的设备, 调用推流方法
if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) { if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId());
sendCommandService.sendToDevice(message.getClientId(), null); sendCommandService.sendToDevice(null, null);
} else { } else {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice());
} }
}
}).start();
T0001 result = new T0001(); T0001 result = new T0001();
result.setResponseSerialNo(message.getSerialNo()); result.setResponseSerialNo(message.getSerialNo());

@ -70,35 +70,45 @@ public class SendCommandService {
thread.interrupt(); thread.interrupt();
} }
sendToDevice(); sendToDevice(null, null);
} }
/** /**
* 管理当前所有在线设备, 自动发送推流命令, 直到成功一个或全部失败, 通知慢直播平台当前推流车辆及推流状态 * 管理当前所有在线设备, 自动发送推流命令, 直到成功一个或全部失败, 通知慢直播平台当前推流车辆及推流状态
*/ */
public void sendToDevice() { public void sendToDevice(String clientId, Integer channelNo) {
if (sessionManager.all() == null || sessionManager.all().size() == 0) { if (sessionManager.all() == null || sessionManager.all().size() == 0) {
// 当前无在线设备, 直接返回, 通知给慢直播设备离线 // 当前无在线设备, 直接返回, 通知给慢直播设备离线
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 当前无在线设备, 直接返回, 通知给慢直播设备离线 "); log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 当前无在线设备, 直接返回, 通知给慢直播设备离线 ");
noticeCarStatus(null, null, BusinessConstant.LIVE_STATUS_OFFLINE); noticeCarStatus(null, null, BusinessConstant.LIVE_STATUS_OFFLINE);
return; return;
} }
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 开始执行 ");
if (thread != null && thread.isAlive()) { if (thread != null && thread.isAlive()) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 已有线程执行推流, 直接返回 "); log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 已有线程执行推流, 直接返回 ");
return; return;
} }
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING); noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING);
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 启动线程, 开始执行发送推流命令 ");
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 启动线程, 开始执行发送推流命令 ");
thread = new Thread(new Runnable() { thread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
if (!StringUtil.isNullOrEmpty(clientId)) {
if (sessionManager.all().contains(clientId)) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 指定设备推流 : clientId = {}", clientId);
boolean result = sendCommand(clientId, BusinessConstant.DEFAULT_CHANNEL_NO);
// 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令
if (result == true) {
return;
}
} else {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 指定设备推流 : clientId = {}, 设备不在线, 推流失败", clientId);
}
} else {
for (Session session : sessionManager.all()) { for (Session session : sessionManager.all()) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 调用推流方法开始 "); log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 遍历设备推流 : clientId = {}", session.getClientId());
boolean result = sendCommand(session.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO); boolean result = sendCommand(session.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO);
// 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令 // 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令
@ -106,6 +116,7 @@ public class SendCommandService {
return; return;
} }
} }
}
// 所有在线设备发送命令失败后, 通知慢直播平台当前推流的状态 // 所有在线设备发送命令失败后, 通知慢直播平台当前推流的状态
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE); noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE);
@ -115,36 +126,36 @@ public class SendCommandService {
thread.start(); thread.start();
} }
/** // /**
* 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9, 通知慢直播平台当前推流车辆及推流状态 // * 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9, 通知慢直播平台当前推流车辆及推流状态
* // *
* @param clientId // * @param clientId
* @param channelNo // * @param channelNo
*/ // */
public boolean sendToDevice(String clientId, Integer channelNo) { // public boolean sendToDevice(String clientId, Integer channelNo) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice(clientId, channelNo) {} - {} : 开始执行 ", clientId, channelNo); // log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice(clientId, channelNo) {} - {} : 开始执行 ", clientId, channelNo);
channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; // channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo;
//
if (StringUtil.isNullOrEmpty(clientId)) { // if (StringUtil.isNullOrEmpty(clientId)) {
return false; // return false;
} // }
//
// 判断该设备是否为在线设备 // // 判断该设备是否为在线设备
Session session = sessionManager.get(clientId); // Session session = sessionManager.get(clientId);
if (session == null) { // if (session == null) {
// 设备未在线, 直接返回 // // 设备未在线, 直接返回
return false; // return false;
} // }
//
noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING); // noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING);
if (!sendCommand(clientId, channelNo)) { // if (!sendCommand(clientId, channelNo)) {
// 若推流命令发送失败, 设备设置为离线 // // 若推流命令发送失败, 设备设置为离线
noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE); // noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE);
} // }
return true; // return true;
} // }
private boolean sendCommand(String clientId, Integer channelNo) { private synchronized boolean sendCommand(String clientId, Integer channelNo) {
channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo;
try { try {

Loading…
Cancel
Save