diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java index 26d31db..7f6f75e 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java @@ -10,6 +10,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; +import io.netty.util.internal.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yzh.commons.util.BusinessCacheUtil; @@ -70,9 +71,11 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter { log.warn(">>>>> Disconnected clientId = {} , address = {}", clientId, client(ctx)); - // 若掉线设备为当前推流设备, 调用推流方法, 发送推流 - if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice()) && SendCommandUtil.getService() != null) { - SendCommandUtil.getService().sendToDevice(); + // 若掉线设备为当前推流设备, 或无推流设备, 但当前有在线设备, 调用推流方法, 发送推流 + if ((clientId.equals(BusinessCacheUtil.getCurrentPushDevice()) + || (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) && sessionManager.all().size() > 0)) + && SendCommandUtil.getService() != null) { + SendCommandUtil.getService().sendToDevice(null, null); } } diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java index 559aeee..eb3c8ab 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java @@ -86,13 +86,14 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler> T9101(@RequestBody T9101 request) { + return messageManager.requestR(request, T0001.class); + } + + @Operation(summary = "9102 音视频实时传输控制") + @PostMapping("9102") + public Mono> T9102(@RequestBody T9102 request) { + Mono> resultMono = messageManager.requestR(request, T0001.class); + return resultMono; + } + + /** + * 自定义推流方法, 先停止原来的视频刘 + * + * @param request + * @return + */ + @Operation(summary = "9101 实时音视频传输请求-自定义") + @PostMapping("9101-Custom") + public Mono> T9101Custom(@RequestBody T9101 request) { log.warn(BusinessConstant.LOGGER_PREFIX + " : 9101 实时音视频传输请求 开始执行."); Mono> resultMono = null; @@ -93,9 +113,15 @@ public class JT1078Controller { return resultMono; } - @Operation(summary = "9102 音视频实时传输控制") - @PostMapping("9102") - public Mono> T9102(@RequestBody T9102 request) { + /** + * 自定义推流方法, 若停止推流设备正在推流, 触发自动推流方法 + * + * @param request + * @return + */ + @Operation(summary = "9102 音视频实时传输控制-自定义") + @PostMapping("9102-Custom") + public Mono> T9102Custom(@RequestBody T9102 request) { log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step1 : 停止推流 "); Mono> resultMono = messageManager.requestR(request, T0001.class); @@ -103,24 +129,11 @@ public class JT1078Controller { log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step2 : 当前推流设备 = {} - {} " + BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel()); if (request.getClientId().equals(BusinessCacheUtil.getCurrentPushDevice())) { log.warn(BusinessConstant.LOGGER_PREFIX + " : T9102 - step3 : 调用推流方法 "); - sendCommandService.sendToDevice(); + sendCommandService.sendToDevice(null, null); } return resultMono; } - @Operation(summary = "9101 实时音视频传输请求") - @PostMapping("9101-v2") - public Mono> T9101V2(@RequestBody T9101 request) { - return messageManager.requestR(request, T0001.class); - } - - @Operation(summary = "9102 音视频实时传输控制") - @PostMapping("9102-v2") - public Mono> T9102V2(@RequestBody T9102 request) { - Mono> resultMono = messageManager.requestR(request, T0001.class); - return resultMono; - } - @Operation(summary = "9201 平台下发远程录像回放请求") @PostMapping("9201") public Mono> T9201(@RequestBody T9201 request) { diff --git a/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java b/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java index 924bb80..5773a25 100644 --- a/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java +++ b/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java @@ -74,14 +74,25 @@ public class JT808Endpoint { device.setPlateNo(message.getPlateNo()); session.setAttribute(SessionKey.Device, device); - // 若无正在推流的设备, 调用推流方法 SendCommandUtil.setService(sendCommandService); - if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) { - log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); - sendCommandService.sendToDevice(message.getClientId(), null); - } else { - log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); - } + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(15000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // 若无正在推流的设备, 调用推流方法 + if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); + sendCommandService.sendToDevice(null, null); + } else { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); + } + } + }).start(); T8100 result = new T8100(); result.setResponseSerialNo(message.getSerialNo()); @@ -102,14 +113,25 @@ public class JT808Endpoint { device.setPlateNo(token[1]); session.setAttribute(SessionKey.Device, device); - // 若无正在推流的设备, 调用推流方法 SendCommandUtil.setService(sendCommandService); - if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) { - log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); - sendCommandService.sendToDevice(message.getClientId(), null); - } else { - log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); - } + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(15000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // 若无正在推流的设备, 调用推流方法 + if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); + sendCommandService.sendToDevice(null, null); + } else { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); + } + } + }).start(); T0001 result = new T0001(); result.setResponseSerialNo(message.getSerialNo()); diff --git a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java index 0b0d511..44e37fe 100644 --- a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java +++ b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java @@ -70,40 +70,51 @@ public class SendCommandService { thread.interrupt(); } - sendToDevice(); + sendToDevice(null, null); } /** * 管理当前所有在线设备, 自动发送推流命令, 直到成功一个或全部失败, 通知慢直播平台当前推流车辆及推流状态 */ - public void sendToDevice() { + public void sendToDevice(String clientId, Integer channelNo) { if (sessionManager.all() == null || sessionManager.all().size() == 0) { // 当前无在线设备, 直接返回, 通知给慢直播设备离线 - log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 当前无在线设备, 直接返回, 通知给慢直播设备离线 "); + log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 当前无在线设备, 直接返回, 通知给慢直播设备离线 "); noticeCarStatus(null, null, BusinessConstant.LIVE_STATUS_OFFLINE); return; } - log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 开始执行 "); - if (thread != null && thread.isAlive()) { - log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 已有线程执行推流, 直接返回 "); + log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 已有线程执行推流, 直接返回 "); return; } noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING); - - log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 启动线程, 开始执行发送推流命令 "); + log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 启动线程, 开始执行发送推流命令 "); thread = new Thread(new Runnable() { @Override public void run() { - for (Session session : sessionManager.all()) { - log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 调用推流方法开始 "); - boolean result = sendCommand(session.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO); - - // 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令 - if (result == true) { - return; + if (!StringUtil.isNullOrEmpty(clientId)) { + if (sessionManager.all().contains(clientId)) { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 指定设备推流 : clientId = {}", clientId); + boolean result = sendCommand(clientId, BusinessConstant.DEFAULT_CHANNEL_NO); + + // 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令 + if (result == true) { + return; + } + } else { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 指定设备推流 : clientId = {}, 设备不在线, 推流失败", clientId); + } + } else { + for (Session session : sessionManager.all()) { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 遍历设备推流 : clientId = {}", session.getClientId()); + boolean result = sendCommand(session.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO); + + // 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令 + if (result == true) { + return; + } } } @@ -115,36 +126,36 @@ public class SendCommandService { thread.start(); } - /** - * 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9, 通知慢直播平台当前推流车辆及推流状态 - * - * @param clientId - * @param channelNo - */ - public boolean sendToDevice(String clientId, Integer channelNo) { - log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice(clientId, channelNo) {} - {} : 开始执行 ", clientId, channelNo); - channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; - - if (StringUtil.isNullOrEmpty(clientId)) { - return false; - } - - // 判断该设备是否为在线设备 - Session session = sessionManager.get(clientId); - if (session == null) { - // 设备未在线, 直接返回 - return false; - } - - noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING); - if (!sendCommand(clientId, channelNo)) { - // 若推流命令发送失败, 设备设置为离线 - noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE); - } - return true; - } - - private boolean sendCommand(String clientId, Integer channelNo) { +// /** +// * 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9, 通知慢直播平台当前推流车辆及推流状态 +// * +// * @param clientId +// * @param channelNo +// */ +// public boolean sendToDevice(String clientId, Integer channelNo) { +// log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice(clientId, channelNo) {} - {} : 开始执行 ", clientId, channelNo); +// channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; +// +// if (StringUtil.isNullOrEmpty(clientId)) { +// return false; +// } +// +// // 判断该设备是否为在线设备 +// Session session = sessionManager.get(clientId); +// if (session == null) { +// // 设备未在线, 直接返回 +// return false; +// } +// +// noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING); +// if (!sendCommand(clientId, channelNo)) { +// // 若推流命令发送失败, 设备设置为离线 +// noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE); +// } +// return true; +// } + + private synchronized boolean sendCommand(String clientId, Integer channelNo) { channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; try {