1. websocket通讯

master
liuqingkun 3 years ago
parent 719f87c487
commit 9cb8c4832e
  1. 21
      commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java
  2. 14
      jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java
  3. 16
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java
  4. 2
      jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java
  5. 13
      jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java
  6. 10
      jtt808-server/src/main/java/org/yzh/web/util/SendCommandUtil.java

@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class BusinessCacheUtil { public class BusinessCacheUtil {
private static Map<String, String> businessCache = new ConcurrentHashMap<>(); private static Map<String, String> businessCache = new ConcurrentHashMap<>();
private static String CACHE_KEY_CURRENT_PUSH_DEVICE = "current-push-device"; private static String CACHE_KEY_CURRENT_PUSH_DEVICE = "current-push-device";
private static String CACHE_KEY_1078_PUSH_DEVICE = "1078-push-device";
private static String CACHE_KEY_PUSH_STATUS = "push-status"; private static String CACHE_KEY_PUSH_STATUS = "push-status";
/** /**
@ -24,9 +25,29 @@ public class BusinessCacheUtil {
} }
public static void setCurrentPushDevice(String deviceId) { public static void setCurrentPushDevice(String deviceId) {
if (deviceId == null) {
deviceId = "";
}
businessCache.put(CACHE_KEY_CURRENT_PUSH_DEVICE, deviceId); businessCache.put(CACHE_KEY_CURRENT_PUSH_DEVICE, deviceId);
} }
/**
* 获取当前推流车辆的clientID
*
* @return
*/
public static String get1078PushDevice() {
if (businessCache.containsKey(CACHE_KEY_1078_PUSH_DEVICE)) {
return businessCache.get(CACHE_KEY_1078_PUSH_DEVICE);
}
return null;
}
public static void set1078PushDevice(String deviceId) {
businessCache.put(CACHE_KEY_1078_PUSH_DEVICE, deviceId);
}
/** /**
* 获取推送状态, 默认为离线 * 获取推送状态, 默认为离线
* *

@ -12,9 +12,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.yzh.web.util.SendCommandUtil;
import org.yzh.commons.util.BusinessCacheUtil;
import org.yzh.web.service.SendCommandService;
import java.io.IOException; import java.io.IOException;
@ -33,9 +31,6 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter {
private final SessionManager sessionManager; private final SessionManager sessionManager;
@Autowired
private static SendCommandService sendCommandService;
public TCPMessageAdapter(SessionManager sessionManager) { public TCPMessageAdapter(SessionManager sessionManager) {
this.sessionManager = sessionManager; this.sessionManager = sessionManager;
} }
@ -73,9 +68,10 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter {
} }
// 若掉线设备为当前推流设备, 调用推流方法, 发送推流 // 若掉线设备为当前推流设备, 调用推流方法, 发送推流
if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice())) { // if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice())) {
sendCommandService.sendToDevice(); // SendCommandUtil.getService().sendToDevice();
} // }
SendCommandUtil.getService().sendToDevice();
log.info(">>>>> Disconnected{}", client(ctx)); log.info(">>>>> Disconnected{}", client(ctx));
} }

@ -9,6 +9,12 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.yzh.commons.util.BusinessCacheUtil;
import org.yzh.web.service.SendCommandService;
import org.yzh.web.util.SendCommandUtil;
import java.time.LocalDateTime;
/** /**
* 处理消息的handler * 处理消息的handler
@ -31,8 +37,9 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
Channel currentChannel = ctx.channel(); Channel currentChannel = ctx.channel();
// 获取客户端传输过来的消息 // 获取客户端传输过来的消息
String content = msg.text(); String content = msg.text();
log.debug("receive content : " + content);
log.info(" 群消息发送... content = " + content); BusinessCacheUtil.set1078PushDevice(content);
currentChannel.writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + content));
} }
@ -43,7 +50,8 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
users.add(ctx.channel()); users.add(ctx.channel());
log.info(" netty 获得连接..... "); log.info("netty 获得连接..... ");
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " qw12312"));
} }
@Override @Override
@ -51,6 +59,8 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
String channelId = ctx.channel().id().asShortText(); String channelId = ctx.channel().id().asShortText();
// 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
users.remove(ctx.channel()); users.remove(ctx.channel());
BusinessCacheUtil.set1078PushDevice("");
SendCommandUtil.getService().sendToDevice();
log.info("客户端被移除,channelId为:" + channelId); log.info("客户端被移除,channelId为:" + channelId);
} }

@ -17,6 +17,7 @@ import org.yzh.web.model.entity.DeviceDO;
import org.yzh.web.model.enums.SessionKey; import org.yzh.web.model.enums.SessionKey;
import org.yzh.web.service.FileService; import org.yzh.web.service.FileService;
import org.yzh.web.service.SendCommandService; import org.yzh.web.service.SendCommandService;
import org.yzh.web.util.SendCommandUtil;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -72,6 +73,7 @@ public class JT808Endpoint {
session.setAttribute(SessionKey.Device, device); session.setAttribute(SessionKey.Device, device);
// 若无正在推流的设备, 调用推流方法 // 若无正在推流的设备, 调用推流方法
SendCommandUtil.setService(sendCommandService);
if (BusinessCacheUtil.getCurrentPushDevice() == null) { if (BusinessCacheUtil.getCurrentPushDevice() == null) {
sendCommandService.sendToDevice(message.getClientId(), null); sendCommandService.sendToDevice(message.getClientId(), null);
} }

@ -125,31 +125,30 @@ public class SendCommandService {
request.setTcpPort(4001); request.setTcpPort(4001);
request.setUdpPort(0); request.setUdpPort(0);
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class); Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
log.info("*********** 自定义发送实时音视频传输请求 t0102 : " + resultMono.block().toString()); log.info("*********** 自定义发送实时音视频传输请求 t0102 : " + resultMono.block().isSuccess());
String sendResult = "false"; boolean sendResult = resultMono.block().isSuccess();
// 若命令发送失败, 返回false // 若命令发送失败, 返回false
if (Boolean.FALSE.toString().equals(sendResult)) { if (!sendResult) {
return false; return false;
} }
// step2: 由于延迟原因, 等待1s后调用jtt1078流媒体接口, 确认设备是否已推流 // step2: 由于延迟原因, 等待1s后调用jtt1078上传到缓存的推流车辆, 确认设备是否已推流
Thread.sleep(1000); Thread.sleep(1000);
// step3: 调用3次, 失败后等待1s, 继续调用3次. 若仍然失败, 视为命令发送失败, 返回false // step3: 调用3次, 失败后等待1s, 继续调用3次. 若仍然失败, 视为命令发送失败, 返回false
int times = 0; int times = 0;
while (times < 2) { while (times < 2) {
for (int i = 0; i < 3; i++) {
// 调用jtt1078流媒体接口 // 调用jtt1078流媒体接口
String clientId1078 = BusinessCacheUtil.get1078PushDevice();
// 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环 // 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环
if (Boolean.TRUE.toString().equals(sendResult)) { if (clientId.equals(clientId1078)) {
BusinessCacheUtil.setCurrentPushDevice(clientId); BusinessCacheUtil.setCurrentPushDevice(clientId);
BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_ONLINE); BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_ONLINE);
noticeCarStatus(); noticeCarStatus();
return true; return true;
} }
}
Thread.sleep(1000); Thread.sleep(1000);
} }

@ -1,14 +1,18 @@
package org.yzh.web.util; package org.yzh.web.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.yzh.web.service.SendCommandService; import org.yzh.web.service.SendCommandService;
public class SendCommandUtil { public class SendCommandUtil {
private static SendCommandService sendCommandService; private static SendCommandService sendCommandService;
@Autowired public static SendCommandService getService() {
public void setService(SendCommandService sendCommandService) { return sendCommandService;
}
public static void setService(SendCommandService sendCommandService) {
if (SendCommandUtil.sendCommandService == null) {
SendCommandUtil.sendCommandService = sendCommandService; SendCommandUtil.sendCommandService = sendCommandService;
} }
}
} }

Loading…
Cancel
Save