diff --git a/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java b/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java index 35d8d66..80a4a45 100644 --- a/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java +++ b/commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java @@ -6,4 +6,6 @@ public interface BusinessConstant { String LIVE_STATUS_ONLINE = "online"; String LIVE_STATUS_WAITING = "waiting"; String LIVE_STATUS_OFFLINE = "offline"; + + String LIVE_SERVER_UPDATE_PUSH_CAR = "http://42.192.165.208:8002/blade-business/common-api/set-push-car-and-stat"; } diff --git a/commons/src/main/java/org/yzh/commons/model/APIResult.java b/commons/src/main/java/org/yzh/commons/model/APIResult.java index 24662fb..b60633d 100644 --- a/commons/src/main/java/org/yzh/commons/model/APIResult.java +++ b/commons/src/main/java/org/yzh/commons/model/APIResult.java @@ -152,4 +152,14 @@ public class APIResult { throw new UnsupportedOperationException(); } } + + @Override + public String toString() { + return "APIResult{" + + "code=" + code + + ", msg='" + msg + '\'' + + ", detailMsg='" + detailMsg + '\'' + + ", data=" + data + + '}'; + } } \ No newline at end of file diff --git a/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java b/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java index d84039c..1f7ba65 100644 --- a/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java +++ b/commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap; public class BusinessCacheUtil { private static Map businessCache = new ConcurrentHashMap<>(); private static String CACHE_KEY_CURRENT_PUSH_DEVICE = "current-push-device"; + private static String CACHE_KEY_CURRENT_PUSH_CHANNEL = "current-push-channel"; private static String CACHE_KEY_1078_PUSH_DEVICE = "1078-push-device"; private static String CACHE_KEY_PUSH_STATUS = "push-status"; @@ -31,6 +32,26 @@ public class BusinessCacheUtil { businessCache.put(CACHE_KEY_CURRENT_PUSH_DEVICE, deviceId); } + /** + * 获取当前推流车辆的clientID + * + * @return + */ + public static int getCurrentPushChannel() { + if (businessCache.containsKey(CACHE_KEY_CURRENT_PUSH_CHANNEL)) { + return Integer.parseInt(businessCache.get(CACHE_KEY_CURRENT_PUSH_CHANNEL)); + } + + return BusinessConstant.DEFAULT_CHANNEL_NO; + } + + public static void setCurrentPushChannel(Integer channel) { + if (channel == null) { + channel = BusinessConstant.DEFAULT_CHANNEL_NO; + } + businessCache.put(CACHE_KEY_CURRENT_PUSH_CHANNEL, channel.toString()); + } + /** * 获取当前推流车辆的clientID * diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java index ccad7de..63032ee 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java @@ -12,6 +12,7 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.yzh.commons.util.BusinessCacheUtil; import org.yzh.web.util.SendCommandUtil; import java.io.IOException; @@ -67,13 +68,12 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter { session.invalidate(); } - // 若掉线设备为当前推流设备, 调用推流方法, 发送推流 -// if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice())) { -// SendCommandUtil.getService().sendToDevice(); -// } - SendCommandUtil.getService().sendToDevice(); + log.info(">>>>> Disconnected clientId = {} , address = {}", clientId, client(ctx)); - log.info(">>>>> Disconnected{}", client(ctx)); + // 若掉线设备为当前推流设备, 调用推流方法, 发送推流 + if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice()) && SendCommandUtil.getService() != null) { + SendCommandUtil.getService().sendToDevice(); + } } @Override diff --git a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java index c9f0a14..80fbe27 100644 --- a/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java +++ b/jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java @@ -9,9 +9,8 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.yzh.commons.constant.BusinessConstant; import org.yzh.commons.util.BusinessCacheUtil; -import org.yzh.web.service.SendCommandService; import org.yzh.web.util.SendCommandUtil; import java.time.LocalDateTime; @@ -37,9 +36,23 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler> T9003(@RequestBody JTMessage request) { @@ -31,15 +44,45 @@ public class JT1078Controller { @Operation(summary = "9101 实时音视频传输请求") @PostMapping("9101") public Mono> T9101(@RequestBody T9101 request) { - return messageManager.requestR(request, T0001.class); + Mono> resultMono = null; + + // 先调用停止推流方法, 然后指定设备推流 + if (BusinessCacheUtil.getCurrentPushDevice() != null) { + T9102 t9102 = new T9102(); + t9102.setChannelNo(BusinessCacheUtil.getCurrentPushChannel()); + t9102.setClientId(BusinessCacheUtil.getCurrentPushDevice()); + t9102.setCloseType(0); + t9102.setCommand(0); + t9102.setStreamType(0); + resultMono = messageManager.requestR(t9102, T0001.class); + } + + SendCommandUtil.setService(sendCommandService); + sendCommandService.sendToDevice(request.getClientId(), null); + return resultMono; } @Operation(summary = "9102 音视频实时传输控制") @PostMapping("9102") public Mono> T9102(@RequestBody T9102 request) { + Mono> resultMono = messageManager.requestR(request, T0001.class); + sendCommandService.sendToDevice(); + return resultMono; + } + + @Operation(summary = "9101 实时音视频传输请求") + @PostMapping("9101-v2") + public Mono> T9101V2(@RequestBody T9101 request) { return messageManager.requestR(request, T0001.class); } + @Operation(summary = "9102 音视频实时传输控制") + @PostMapping("9102-v2") + public Mono> T9102V2(@RequestBody T9102 request) { + Mono> resultMono = messageManager.requestR(request, T0001.class); + return resultMono; + } + @Operation(summary = "9201 平台下发远程录像回放请求") @PostMapping("9201") public Mono> T9201(@RequestBody T9201 request) { diff --git a/jtt808-server/src/main/java/org/yzh/web/controller/OtherController.java b/jtt808-server/src/main/java/org/yzh/web/controller/OtherController.java index 21a7a72..e0fc82d 100644 --- a/jtt808-server/src/main/java/org/yzh/web/controller/OtherController.java +++ b/jtt808-server/src/main/java/org/yzh/web/controller/OtherController.java @@ -141,6 +141,7 @@ public class OtherController { public APIResult> getCurPushDevice() { Map data = new HashMap<>(); data.put("deviceId", BusinessCacheUtil.getCurrentPushDevice()); + data.put("channelNo", BusinessCacheUtil.getCurrentPushChannel() + ""); data.put("pushStatus", BusinessCacheUtil.getPushStatus()); APIResult result = new APIResult(APICodes.Success.getCode()); diff --git a/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java b/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java index c8e1b88..92b8044 100644 --- a/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java +++ b/jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java @@ -97,6 +97,12 @@ public class JT808Endpoint { device.setPlateNo(token[1]); session.setAttribute(SessionKey.Device, device); + // 若无正在推流的设备, 调用推流方法 + SendCommandUtil.setService(sendCommandService); + if (BusinessCacheUtil.getCurrentPushDevice() == null) { + sendCommandService.sendToDevice(message.getClientId(), null); + } + T0001 result = new T0001(); result.setResponseSerialNo(message.getSerialNo()); result.setResponseMessageId(message.getMessageId()); diff --git a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java index 813b11c..85c02e1 100644 --- a/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java +++ b/jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java @@ -59,9 +59,7 @@ public class SendCommandService { * 自动发送推流命令 */ public void sendToDevice() { - BusinessCacheUtil.setCurrentPushDevice(null); - BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_WAITING); - noticeCarStatus(); + noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING); Thread thread = new Thread(new Runnable() { @Override @@ -74,9 +72,7 @@ public class SendCommandService { } } - BusinessCacheUtil.setCurrentPushDevice(null); - BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_OFFLINE); - noticeCarStatus(); + noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE); } }); @@ -90,9 +86,7 @@ public class SendCommandService { * @param channelNo */ public boolean sendToDevice(String clientId, Integer channelNo) { - BusinessCacheUtil.setCurrentPushDevice(clientId); - BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_WAITING); - noticeCarStatus(); + noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING); // 判断该设备是否为在线设备 Session session = sessionManager.get(clientId); @@ -103,9 +97,7 @@ public class SendCommandService { if (!sendCommand(clientId, channelNo)) { // 若推流命令发送失败, 设备设置为离线 - BusinessCacheUtil.setCurrentPushDevice(null); - BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_OFFLINE); - noticeCarStatus(); + noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE); } return true; } @@ -113,7 +105,6 @@ public class SendCommandService { private boolean sendCommand(String clientId, Integer channelNo) { channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; - log.info("*********** 自定义发送实时音视频传输请求 start"); try { // step1: 向车载机发送命令 T9101 request = new T9101(); @@ -124,10 +115,12 @@ public class SendCommandService { request.setStreamType(0); request.setTcpPort(4001); request.setUdpPort(0); + log.info("*** 发送实时音视频传输请求 start. request params : " + request.toString()); Mono> resultMono = messageManager.requestR(request, T0001.class); - log.info("*********** 自定义发送实时音视频传输请求 t0102 : " + resultMono.block().isSuccess()); + APIResult resultObj = resultMono.block(); + log.info("*** 发送实时音视频传输请求 result : " + resultObj.toString()); - boolean sendResult = resultMono.block().isSuccess(); + boolean sendResult = resultObj.isSuccess(); // 若命令发送失败, 返回false if (!sendResult) { return false; @@ -138,15 +131,13 @@ public class SendCommandService { // step3: 调用3次, 失败后等待1s, 继续调用3次. 若仍然失败, 视为命令发送失败, 返回false int times = 0; - while (times < 2) { + while (times < 3) { // 调用jtt1078流媒体接口 String clientId1078 = BusinessCacheUtil.get1078PushDevice(); // 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环 if (clientId.equals(clientId1078)) { - BusinessCacheUtil.setCurrentPushDevice(clientId); - BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_ONLINE); - noticeCarStatus(); + noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_ONLINE); return true; } @@ -159,15 +150,20 @@ public class SendCommandService { return false; } + public void noticeCarStatus(String clientId, Integer channelNo, String status) { + channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo; + + // 更新推流车辆和推流状态 + BusinessCacheUtil.setCurrentPushDevice(clientId); + BusinessCacheUtil.setCurrentPushChannel(channelNo); + BusinessCacheUtil.setPushStatus(status); - public void noticeCarStatus() { - String clientId = BusinessCacheUtil.getCurrentPushDevice(); - String status = BusinessCacheUtil.getPushStatus(); - // 设备注册后, 调用接口通知给web服务器 - String url = "http://127.0.0.1:8002/blade-business/common-api/set-car-stat"; + // 调用接口通知给慢直播服务器 + String url = BusinessConstant.LIVE_SERVER_UPDATE_PUSH_CAR; Map params = new HashMap<>(); params.put("clientId", clientId); params.put("carStat", status); + log.info(">>> notice to poc-live. url = " + url + " | params = " + params.toString()); HttpClientUtils.doGet(url, params); } } diff --git a/jtt808-server/src/main/resources/log4j2.xml b/jtt808-server/src/main/resources/log4j2.xml index b262aeb..25f04e7 100644 --- a/jtt808-server/src/main/resources/log4j2.xml +++ b/jtt808-server/src/main/resources/log4j2.xml @@ -28,7 +28,7 @@ - +