From 537a4407560c6f1dee0a90818903ed6e82f8b51f Mon Sep 17 00:00:00 2001 From: liuqingkun Date: Sat, 13 May 2023 22:50:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commons/constant/BusinessConstant.java | 2 + .../websocket/WsChannelInboundHandler.java | 19 +++++- .../yzh/web/controller/JT1078Controller.java | 40 ++++++++++- .../org/yzh/web/endpoint/JT808Endpoint.java | 11 ++- .../org/yzh/web/endpoint/MessageManager.java | 23 ++++++- .../yzh/web/service/SendCommandService.java | 68 ++++++++++++++++--- 6 files changed, 146 insertions(+), 17 deletions(-) diff --git a/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java b/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java index 80a4a45..292376f 100644 --- a/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java +++ b/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java @@ -1,6 +1,8 @@ package org.yzh.commons.constant; public interface BusinessConstant { + String LOGGER_PREFIX = " --- custom log --- "; + Integer DEFAULT_CHANNEL_NO = 9; String LIVE_STATUS_ONLINE = "online"; diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java index 80fbe27..f8b5d53 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java @@ -36,18 +36,22 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler> T9003(@RequestBody JTMessage request) { @@ -44,8 +48,24 @@ public class JT1078Controller { @Operation(summary = "9101 实时音视频传输请求") @PostMapping("9101") public Mono> T9101(@RequestBody T9101 request) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 9101 实时音视频传输请求 开始执行."); + Mono> resultMono = null; + APIResult result = new APIResult<>(); + + String clientId = request.getClientId(); + int channelNo = request.getChannelNo(); + // 若指定设备与当前推流设备相同, 直接返回 + if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice()) && channelNo == BusinessCacheUtil.getCurrentPushChannel()) { + log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 {} - {}, 指定设备正在推流.", clientId, channelNo); + result.setMsg("指定设备正在推流"); + return Mono.create(sink -> { + sink.success(result); + }); + } + + log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 - step1 : 当前推流设备 = {} - {} ", BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel()); // 先调用停止推流方法, 然后指定设备推流 if (BusinessCacheUtil.getCurrentPushDevice() != null) { T9102 t9102 = new T9102(); @@ -54,10 +74,21 @@ public class JT1078Controller { t9102.setCloseType(0); t9102.setCommand(0); t9102.setStreamType(0); + + log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 - step2 : 停止推流当前设备 : {} - {} ", BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel()); resultMono = messageManager.requestR(t9102, T0001.class); } + try { + // 添加2s延迟, 避免停止推流与开始推流同时执行 + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + SendCommandUtil.setService(sendCommandService); + + log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 - step3 : 开始推流 设备 = {} - {} ", request.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO); sendCommandService.sendToDevice(request.getClientId(), null); return resultMono; } @@ -65,8 +96,15 @@ public class JT1078Controller { @Operation(summary = "9102 音视频实时传输控制") @PostMapping("9102") public Mono> T9102(@RequestBody T9102 request) { + log.info(BusinessConstant.LOGGER_PREFIX + " : T9102 - step1 : 停止推流 "); Mono> resultMono = messageManager.requestR(request, T0001.class); - sendCommandService.sendToDevice(); + + // 若停止推流设备为正在推流设备, 触发推流方法 + log.info(BusinessConstant.LOGGER_PREFIX + " : T9102 - step2 : 当前推流设备 = {} - {} " + BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel()); + if (request.getClientId().equals(BusinessCacheUtil.getCurrentPushDevice())) { + log.info(BusinessConstant.LOGGER_PREFIX + " : T9102 - step3 : 调用推流方法 "); + sendCommandService.sendToDevice(); + } return resultMono; } diff --git a/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java b/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java index 92b8044..a781fc5 100644 --- a/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java +++ b/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java @@ -9,6 +9,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.yzh.commons.constant.BusinessConstant; import org.yzh.commons.util.BusinessCacheUtil; import org.yzh.protocol.basics.JTMessage; import org.yzh.protocol.commons.JT808; @@ -74,8 +75,11 @@ public class JT808Endpoint { // 若无正在推流的设备, 调用推流方法 SendCommandUtil.setService(sendCommandService); - if (BusinessCacheUtil.getCurrentPushDevice() == null) { + if (BusinessCacheUtil.getCurrentPushDevice() == null || BusinessCacheUtil.getCurrentPushDevice() == message.getClientId()) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); sendCommandService.sendToDevice(message.getClientId(), null); + } else { + log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); } T8100 result = new T8100(); @@ -99,8 +103,11 @@ public class JT808Endpoint { // 若无正在推流的设备, 调用推流方法 SendCommandUtil.setService(sendCommandService); - if (BusinessCacheUtil.getCurrentPushDevice() == null) { + if (BusinessCacheUtil.getCurrentPushDevice() == null || BusinessCacheUtil.getCurrentPushDevice() == message.getClientId()) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId()); sendCommandService.sendToDevice(message.getClientId(), null); + } else { + log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice()); } T0001 result = new T0001(); diff --git a/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java b/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java index 4a5b64a..4092450 100644 --- a/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java +++ b/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java @@ -5,6 +5,7 @@ import io.github.yezhihao.netmc.session.SessionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.yzh.commons.constant.BusinessConstant; import org.yzh.commons.model.APIException; import org.yzh.commons.model.APIResult; import org.yzh.protocol.basics.JTMessage; @@ -64,17 +65,35 @@ public class MessageManager { } public Mono> requestR(JTMessage request, Class responseClass) { +// Session session = sessionManager.get(request.getClientId()); +// if (session == null) +// return OFFLINE_RESULT; +// +// return session.request(request, responseClass) +// .map(message -> APIResult.ok(message)) +// .timeout(Duration.ofSeconds(10), TIMEOUT_RESULT) +// .onErrorResume(e -> { +// log.warn("消息发送失败", e); +// return SENDFAIL_RESULT; +// }); + + // 修改后代码, 更新日志打印 Session session = sessionManager.get(request.getClientId()); if (session == null) return OFFLINE_RESULT; - return session.request(request, responseClass) + log.info(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 参数 = {}", request.toString()); + Mono> resultMono = session.request(request, responseClass) .map(message -> APIResult.ok(message)) .timeout(Duration.ofSeconds(10), TIMEOUT_RESULT) .onErrorResume(e -> { - log.warn("消息发送失败", e); +// log.warn("消息发送失败", e); + log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = 消息发送失败. msg : {}", e); return SENDFAIL_RESULT; }); + log.info(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = {}", resultMono.block().toString()); + + return resultMono; } public Mono request(String sessionId, JTMessage request, Class responseClass, long timeout) { diff --git a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java index 85c02e1..6586b77 100644 --- a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java +++ b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java @@ -2,6 +2,7 @@ package org.yzh.web.service; import io.github.yezhihao.netmc.session.Session; import io.github.yezhihao.netmc.session.SessionManager; +import io.netty.util.internal.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -27,6 +28,11 @@ public class SendCommandService { @Autowired private SessionManager sessionManager; + /** + * 向设备发送推流命令的线程 + */ + private Thread sendCommandToDeviceThread; + public void sendToDevice(String clientId) { if (!"40155591817".equals(clientId) && !"12345678901".equals(clientId) @@ -56,37 +62,65 @@ public class SendCommandService { } /** - * 自动发送推流命令 + * 重启自动发送推流命令 + */ + public void restartSendToDevice() { + if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 停止原线程, 重新开启线程 "); + sendCommandToDeviceThread.interrupt(); + } + + sendToDevice(); + } + + /** + * 管理当前所有在线设备, 自动发送推流命令, 直到成功一个或全部失败, 通知慢直播平台当前推流车辆及推流状态 */ public void sendToDevice() { + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 开始执行 "); + + if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 已有线程执行推流, 直接返回 "); + return; + } + noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING); - Thread thread = new Thread(new Runnable() { + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 启动线程, 开始执行发送推流命令 "); + sendCommandToDeviceThread = new Thread(new Runnable() { @Override public void run() { for (Session session : sessionManager.all()) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 调用推流方法开始 "); boolean result = sendCommand(session.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO); + // 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令 if (result == true) { - break; + return; } } + // 所有在线设备发送命令失败后, 通知慢直播平台当前推流的状态 noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE); } }); - thread.start(); + sendCommandToDeviceThread.start(); } /** - * 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9 + * 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9, 通知慢直播平台当前推流车辆及推流状态 * * @param clientId * @param channelNo */ public boolean sendToDevice(String clientId, Integer channelNo) { - noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING); + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice {} - {} : 开始执行 ", clientId, channelNo); + channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; + + if (StringUtil.isNullOrEmpty(clientId)) { + return false; + } // 判断该设备是否为在线设备 Session session = sessionManager.get(clientId); @@ -95,6 +129,7 @@ public class SendCommandService { return false; } + noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING); if (!sendCommand(clientId, channelNo)) { // 若推流命令发送失败, 设备设置为离线 noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE); @@ -115,33 +150,40 @@ public class SendCommandService { request.setStreamType(0); request.setTcpPort(4001); request.setUdpPort(0); - log.info("*** 发送实时音视频传输请求 start. request params : " + request.toString()); Mono> resultMono = messageManager.requestR(request, T0001.class); APIResult resultObj = resultMono.block(); - log.info("*** 发送实时音视频传输请求 result : " + resultObj.toString()); boolean sendResult = resultObj.isSuccess(); // 若命令发送失败, 返回false if (!sendResult) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 发送命令失败 "); return false; } // step2: 由于延迟原因, 等待1s后调用jtt1078上传到缓存的推流车辆, 确认设备是否已推流 + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 获取1078推流设备, 1s延迟开始 "); Thread.sleep(1000); + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 获取1078推流设备, 1s延迟结束 "); // step3: 调用3次, 失败后等待1s, 继续调用3次. 若仍然失败, 视为命令发送失败, 返回false int times = 0; while (times < 3) { // 调用jtt1078流媒体接口 String clientId1078 = BusinessCacheUtil.get1078PushDevice(); + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 1078推流设备 = {} ", clientId1078); // 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环 if (clientId.equals(clientId1078)) { + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 推流设备 = 发送命令的设备, 返回推流成功"); noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_ONLINE); return true; } + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 重试1s延迟开始 "); Thread.sleep(1000); + log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 重试1s延迟结束 "); + + times++; } } catch (InterruptedException e) { e.printStackTrace(); @@ -150,6 +192,14 @@ public class SendCommandService { return false; } + /** + * 1. 更新当前推流车辆, 推流通道, 推流状态缓存 + * 2. 调用慢直播接口更新推流车辆, 推流状态 + * + * @param clientId + * @param channelNo + * @param status + */ public void noticeCarStatus(String clientId, Integer channelNo, String status) { channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; @@ -163,7 +213,7 @@ public class SendCommandService { Map params = new HashMap<>(); params.put("clientId", clientId); params.put("carStat", status); - log.info(">>> notice to poc-live. url = " + url + " | params = " + params.toString()); + log.info(BusinessConstant.LOGGER_PREFIX + " : 调用慢直播服务接口 : 当前推流设备 = {} - {} - {}", clientId, channelNo, status); HttpClientUtils.doGet(url, params); } }