diff --git a/jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java b/jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java index 2b812da..02bf18c 100644 --- a/jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java +++ b/jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java @@ -65,7 +65,6 @@ public class JT1078Controller { }); } - log.warn(BusinessConstant.LOGGER_PREFIX + " : T9101 - step1 : 当前推流设备 = {} - {} ", BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel()); // 先调用停止推流方法, 然后指定设备推流 if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice())) { diff --git a/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java b/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java index 157701a..9d45a2e 100644 --- a/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java +++ b/jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java @@ -6,9 +6,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.yzh.commons.constant.BusinessConstant; +import org.yzh.commons.model.APICodes; import org.yzh.commons.model.APIException; import org.yzh.commons.model.APIResult; import org.yzh.protocol.basics.JTMessage; +import org.yzh.protocol.t808.T0001; import reactor.core.publisher.Mono; import java.time.Duration; @@ -96,6 +98,33 @@ public class MessageManager { return resultMono; } + public APIResult requestRV2(JTMessage request) { + // 修改后代码, 更新日志打印 + Session session = sessionManager.get(request.getClientId()); + if (session == null) { + T0001 t0001 = new T0001(); + APIResult result = new APIResult<>(t0001); + result.setCode(APICodes.OperationFailed.getCode()); + result.setMsg("指定设备正在推流"); + return result; + } + + log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 参数 = {}", request.toString()); + Mono> resultMono = session.request(request, T0001.class) + .map(message -> APIResult.ok(message)) + .timeout(Duration.ofSeconds(10), TIMEOUT_RESULT) + .onErrorResume(e -> { + log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = 消息发送失败. msg : {}", e); + return SENDFAIL_RESULT; + }); + + // block方法调用>1次后, 会出现重复发送命令的情况, 使用时需注意 + APIResult result = resultMono.block(); + log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = {}", result.toString()); + + return result; + } + public Mono request(String sessionId, JTMessage request, Class responseClass, long timeout) { return request(sessionId, request, responseClass).timeout(Duration.ofMillis(timeout)); } diff --git a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java index e6549db..0b0d511 100644 --- a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java +++ b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java @@ -2,7 +2,6 @@ package org.yzh.web.service; import io.github.yezhihao.netmc.session.Session; import io.github.yezhihao.netmc.session.SessionManager; -import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +31,7 @@ public class SendCommandService { /** * 向设备发送推流命令的线程 */ - private Thread sendCommandToDeviceThread; + private Thread thread; public void sendToDevice(String clientId) { if (!"40155591817".equals(clientId) @@ -66,9 +65,9 @@ public class SendCommandService { * 重启自动发送推流命令 */ public void restartSendToDevice() { - if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) { + if (thread != null && thread.isAlive()) { log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 停止原线程, 重新开启线程 "); - sendCommandToDeviceThread.interrupt(); + thread.interrupt(); } sendToDevice(); @@ -87,7 +86,7 @@ public class SendCommandService { log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 开始执行 "); - if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) { + if (thread != null && thread.isAlive()) { log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 已有线程执行推流, 直接返回 "); return; } @@ -95,7 +94,7 @@ public class SendCommandService { noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING); log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 启动线程, 开始执行发送推流命令 "); - sendCommandToDeviceThread = new Thread(new Runnable() { + thread = new Thread(new Runnable() { @Override public void run() { for (Session session : sessionManager.all()) { @@ -113,7 +112,7 @@ public class SendCommandService { } }); - sendCommandToDeviceThread.start(); + thread.start(); } /** @@ -158,8 +157,7 @@ public class SendCommandService { request.setStreamType(0); request.setTcpPort(4001); request.setUdpPort(0); - Mono> resultMono = messageManager.requestR(request, T0001.class); - APIResult resultObj = resultMono.block(); + APIResult resultObj = messageManager.requestRV2(request); boolean sendResult = resultObj.isSuccess(); // 若命令发送失败, 返回false