From 9cb8c4832e9c98e6cf30e2649ef7ab9613b6e670 Mon Sep 17 00:00:00 2001 From: liuqingkun Date: Tue, 9 May 2023 19:10:31 +0800 Subject: [PATCH] =?UTF-8?q?1.=20websocket=E9=80=9A=E8=AE=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yzh/commons/util/BusinessCacheUtil.java | 21 +++++++++++++++ .../netmc/handler/TCPMessageAdapter.java | 14 ++++------ .../websocket/WsChannelInboundHandler.java | 16 ++++++++--- .../org/yzh/web/endpoint/JT808Endpoint.java | 2 ++ .../yzh/web/service/SendCommandService.java | 27 +++++++++---------- .../org/yzh/web/util/SendCommandUtil.java | 12 ++++++--- 6 files changed, 62 insertions(+), 30 deletions(-) diff --git a/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java b/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java index 33b45bb..d84039c 100644 --- a/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java +++ b/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap; public class BusinessCacheUtil { private static Map businessCache = new ConcurrentHashMap<>(); private static String CACHE_KEY_CURRENT_PUSH_DEVICE = "current-push-device"; + private static String CACHE_KEY_1078_PUSH_DEVICE = "1078-push-device"; private static String CACHE_KEY_PUSH_STATUS = "push-status"; /** @@ -24,9 +25,29 @@ public class BusinessCacheUtil { } public static void setCurrentPushDevice(String deviceId) { + if (deviceId == null) { + deviceId = ""; + } businessCache.put(CACHE_KEY_CURRENT_PUSH_DEVICE, deviceId); } + /** + * 获取当前推流车辆的clientID + * + * @return + */ + public static String get1078PushDevice() { + if (businessCache.containsKey(CACHE_KEY_1078_PUSH_DEVICE)) { + return businessCache.get(CACHE_KEY_1078_PUSH_DEVICE); + } + + return null; + } + + public static void set1078PushDevice(String deviceId) { + businessCache.put(CACHE_KEY_1078_PUSH_DEVICE, deviceId); + } + /** * 获取推送状态, 默认为离线 * diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java index 23a7975..ccad7de 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java @@ -12,9 +12,7 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.yzh.commons.util.BusinessCacheUtil; -import org.yzh.web.service.SendCommandService; +import org.yzh.web.util.SendCommandUtil; import java.io.IOException; @@ -33,9 +31,6 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter { private final SessionManager sessionManager; - @Autowired - private static SendCommandService sendCommandService; - public TCPMessageAdapter(SessionManager sessionManager) { this.sessionManager = sessionManager; } @@ -73,9 +68,10 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter { } // 若掉线设备为当前推流设备, 调用推流方法, 发送推流 - if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice())) { - sendCommandService.sendToDevice(); - } +// if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice())) { +// SendCommandUtil.getService().sendToDevice(); +// } + SendCommandUtil.getService().sendToDevice(); log.info(">>>>> Disconnected{}", client(ctx)); } diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java index f8739d8..c9f0a14 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java @@ -9,6 +9,12 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.yzh.commons.util.BusinessCacheUtil; +import org.yzh.web.service.SendCommandService; +import org.yzh.web.util.SendCommandUtil; + +import java.time.LocalDateTime; /** * 处理消息的handler @@ -31,8 +37,9 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler> resultMono = messageManager.requestR(request, T0001.class); - log.info("*********** 自定义发送实时音视频传输请求 t0102 : " + resultMono.block().toString()); + log.info("*********** 自定义发送实时音视频传输请求 t0102 : " + resultMono.block().isSuccess()); - String sendResult = "false"; + boolean sendResult = resultMono.block().isSuccess(); // 若命令发送失败, 返回false - if (Boolean.FALSE.toString().equals(sendResult)) { + if (!sendResult) { return false; } - // step2: 由于延迟原因, 等待1s后调用jtt1078流媒体接口, 确认设备是否已推流 + // step2: 由于延迟原因, 等待1s后调用jtt1078上传到缓存的推流车辆, 确认设备是否已推流 Thread.sleep(1000); // step3: 调用3次, 失败后等待1s, 继续调用3次. 若仍然失败, 视为命令发送失败, 返回false int times = 0; while (times < 2) { - for (int i = 0; i < 3; i++) { - // 调用jtt1078流媒体接口 - - // 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环 - if (Boolean.TRUE.toString().equals(sendResult)) { - BusinessCacheUtil.setCurrentPushDevice(clientId); - BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_ONLINE); - noticeCarStatus(); - return true; - } + // 调用jtt1078流媒体接口 + String clientId1078 = BusinessCacheUtil.get1078PushDevice(); + + // 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环 + if (clientId.equals(clientId1078)) { + BusinessCacheUtil.setCurrentPushDevice(clientId); + BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_ONLINE); + noticeCarStatus(); + return true; } Thread.sleep(1000); diff --git a/jtt808-server/src/main/java/org/yzh/web/util/SendCommandUtil.java b/jtt808-server/src/main/java/org/yzh/web/util/SendCommandUtil.java index abd5696..3900c59 100644 --- a/jtt808-server/src/main/java/org/yzh/web/util/SendCommandUtil.java +++ b/jtt808-server/src/main/java/org/yzh/web/util/SendCommandUtil.java @@ -1,14 +1,18 @@ package org.yzh.web.util; -import org.springframework.beans.factory.annotation.Autowired; import org.yzh.web.service.SendCommandService; public class SendCommandUtil { private static SendCommandService sendCommandService; - @Autowired - public void setService(SendCommandService sendCommandService) { - SendCommandUtil.sendCommandService = sendCommandService; + public static SendCommandService getService() { + return sendCommandService; + } + + public static void setService(SendCommandService sendCommandService) { + if (SendCommandUtil.sendCommandService == null) { + SendCommandUtil.sendCommandService = sendCommandService; + } } }