代码优化, 日志更新

master
liuqingkun 3 years ago
parent bf12bf1aad
commit 0acf396df5
  1. 50
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java
  2. 40
      jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java
  3. 10
      jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java

@ -26,9 +26,6 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
private static final Logger log = LoggerFactory.getLogger(WsChannelInboundHandler.class);
@Autowired
private SessionManager sessionManager;
/**
* 用于记录和管理所有客户端的channle
*/
@ -50,18 +47,17 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
return;
}
// 每分钟的0s, 打印当前推流通道
LocalTime localTime = LocalTime.now();
if (localTime.getSecond() == 0) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : jt1078推送信息 : currentChannel = {}:{}:{}, content = {}",
currentChannel.id(), currentChannel.localAddress(), currentChannel.remoteAddress(), content);
}
content = content.replace("jtt1078:", "");
String curPush = BusinessCacheUtil.getCurrentPushDevice();
int curPushChannel = BusinessCacheUtil.getCurrentPushChannel();
// 每分钟的0s, 打印当前推流通道
LocalTime localTime = LocalTime.now();
if (localTime.getSecond() == 0) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : jt808当前推流设备 = {}-{}: 1078推流设备信息 = {} ", curPush, curPushChannel, content);
}
if (content.contains("-")) {
// 由于1078传过来的信息为(0+clientId+'-'+channelNo), 故此处需要解析
// 接收jt1078通过websocket传输的数据
@ -71,38 +67,35 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
BusinessCacheUtil.set1078PushDevice(clientId);
// 重置1078推送为空的次数
// BusinessCacheUtil.setPushEmptyTimes(0);
log.debug(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 1078推流设备 = {} - {} ", clientId, channelNo);
BusinessCacheUtil.setPushEmptyTimes(0);
// 如果1078推送的正在推流的设备 != 808记录的当前推流的设备, 调用接口通知慢直播服务, 当前推流车辆及推流状态信息
if (!clientId.equals(curPush)) {
if (SendCommandUtil.getService() != null) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 原推流设备 = {} - {} : 更新为1078正在推流设备 = {} - {} ", curPush, curPushChannel, clientId, channelNo);
SendCommandUtil.getService().noticeCarStatus(clientId, Integer.parseInt(channelNo), BusinessConstant.LIVE_STATUS_ONLINE);
}
}
} else {
// 若接收到1078推送的当前推流设备为空, 更新808的缓存
// 若808记录的当前推流设备不为空, 更新808的缓存
if (!StringUtil.isNullOrEmpty(curPush)) {
if (SendCommandUtil.getService() != null) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 原推流设备 = {} - {} : 更新为1078正在推流设备 = null ", curPush, curPushChannel);
SendCommandUtil.getService().noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE);
}
}
// 若接收到1078推送的当前推流设备为空, 触发自动推流方法
// int emptyTimes = BusinessCacheUtil.getPushEmptyTimes();
// // 若持续30s收到的消息都为空, 调用推流方法
// if (emptyTimes == 30) {
int emptyTimes = BusinessCacheUtil.getPushEmptyTimes();
// 若持续30s收到的消息都为空, 调用推流方法
if (emptyTimes == 30) {
// log.warn(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 持续30s收到的消息都为空, 调用推流方法 ");
if (SendCommandUtil.getService() != null && sessionManager != null && sessionManager.all() != null && sessionManager.all().size() > 0) {
SendCommandUtil.getService().sendToDevice(null, null);
if (SendCommandUtil.getService() != null) {
SendCommandUtil.getService().sendToDevice(null, null);
}
emptyTimes = 0;
} else {
emptyTimes++;
}
// emptyTimes = 0;
// } else {
// emptyTimes++;
// }
// BusinessCacheUtil.setPushEmptyTimes(emptyTimes);
BusinessCacheUtil.setPushEmptyTimes(emptyTimes);
}
currentChannel.writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " : " + content));
@ -119,13 +112,8 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
if (!users.contains(nettyChannel)) {
users.add(nettyChannel);
log.warn(BusinessConstant.LOGGER_PREFIX + " : websocket连接 : 客户端上线 : channelId = {} ", nettyChannel.id().asLongText());
BusinessCacheUtil.set1078PushDevice("");
if (SendCommandUtil.getService() != null) {
// 1078上线后, 停止原推流线程, 重新启动线程发送推流命令
SendCommandUtil.getService().restartSendToDevice();
}
}
BusinessCacheUtil.set1078PushDevice("");
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " qw12312"));
}

@ -75,14 +75,14 @@ public class JT808Endpoint {
session.setAttribute(SessionKey.Device, device);
SendCommandUtil.setService(sendCommandService);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// new Thread(new Runnable() {
// @Override
// public void run() {
// try {
// Thread.sleep(15000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// 若无正在推流的设备, 调用推流方法
if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) {
@ -91,8 +91,8 @@ public class JT808Endpoint {
} else {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端注册触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice());
}
}
}).start();
// }
// }).start();
T8100 result = new T8100();
result.setResponseSerialNo(message.getSerialNo());
@ -114,14 +114,14 @@ public class JT808Endpoint {
session.setAttribute(SessionKey.Device, device);
SendCommandUtil.setService(sendCommandService);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// new Thread(new Runnable() {
// @Override
// public void run() {
// try {
// Thread.sleep(15000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// 若无正在推流的设备, 调用推流方法
if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice()) || BusinessCacheUtil.getCurrentPushDevice().equals(message.getClientId())) {
@ -130,8 +130,8 @@ public class JT808Endpoint {
} else {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice());
}
}
}).start();
// }
// }).start();
T0001 result = new T0001();
result.setResponseSerialNo(message.getSerialNo());

@ -17,6 +17,7 @@ import org.yzh.web.endpoint.MessageManager;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.function.Function;
@Service
public class SendCommandService {
@ -90,10 +91,10 @@ public class SendCommandService {
}
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING);
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 启动线程, 开始执行发送推流命令 ");
thread = new Thread(new Runnable() {
@Override
public void run() {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 线程启动, 发送推流命令 - 开始");
if (!StringUtil.isNullOrEmpty(clientId)) {
if (sessionManager.all().contains(clientId)) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 指定设备推流 : clientId = {}", clientId);
@ -120,6 +121,7 @@ public class SendCommandService {
// 所有在线设备发送命令失败后, 通知慢直播平台当前推流的状态
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE);
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自动推流方法 : 线程结束, 发送推流命令 - 完成");
}
});
@ -214,6 +216,12 @@ public class SendCommandService {
* @param status
*/
public void noticeCarStatus(String clientId, Integer channelNo, String status) {
// 若本次推送的状态和车辆与缓存中的一致, 不继续执行了, 直接返回, 避免重复调用
if ((BusinessCacheUtil.getPushStatus().equals(status) || (StringUtil.isNullOrEmpty(clientId) && StringUtil.isNullOrEmpty(BusinessCacheUtil.getPushStatus())))
&& BusinessCacheUtil.getCurrentPushDevice().equals(clientId)) {
return;
}
// 若1078通知在线, 但此时无在线设备, 不再更新状态
if (BusinessConstant.LIVE_STATUS_ONLINE.equals(status) && (sessionManager.all() == null || sessionManager.all().size() == 0)) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : noticeCarStatus(更新车辆状态缓存) : 当前无在线设备, 将设备强制置为离线, 推送给慢直播服务");

Loading…
Cancel
Save