增加日志

master
liuqingkun 3 years ago
parent fd39453532
commit 537a440756
  1. 2
      commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java
  2. 19
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java
  3. 40
      jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java
  4. 11
      jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java
  5. 23
      jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java
  6. 68
      jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java

@ -1,6 +1,8 @@
package org.yzh.commons.constant;
public interface BusinessConstant {
String LOGGER_PREFIX = " --- custom log --- ";
Integer DEFAULT_CHANNEL_NO = 9;
String LIVE_STATUS_ONLINE = "online";

@ -36,18 +36,22 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
Channel currentChannel = ctx.channel();
// 获取客户端传输过来的消息
String content = msg.text();
log.debug("<<< receive content : " + content);
if (content.contains("-")) {
// 由于1078传过来的信息为(0+clientId+'-'+channelNo), 故此处需要解析
// 接收jt1078通过websocket传输的数据
String clientId = content.split("-")[0].substring(1);
String channelNo = content.split("-")[1];
BusinessCacheUtil.set1078PushDevice(clientId);
String curPush = BusinessCacheUtil.getCurrentPushDevice();
int curPushChannel = BusinessCacheUtil.getCurrentPushChannel();
log.debug(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 1078推流设备 = {} - {} ", clientId, channelNo);
if (!clientId.equals(curPush)) {
log.info("<<<<<<<< receive client id :" + clientId + " - " + channelNo);
if (SendCommandUtil.getService() != null) {
log.info(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 原推流设备 = {} - {} ", curPush, curPushChannel);
log.info(BusinessConstant.LOGGER_PREFIX + " : websocket接收 : 更新为1078正在推流设备 = {} - {} ", clientId, channelNo);
SendCommandUtil.getService().noticeCarStatus(clientId, Integer.parseInt(channelNo), BusinessConstant.LIVE_STATUS_ONLINE);
}
}
@ -64,7 +68,15 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
users.add(ctx.channel());
log.info("netty 获得连接..... ");
log.info(BusinessConstant.LOGGER_PREFIX + " : websocket连接 : 客户端上线 : channelId = {} ", ctx.channel().id().asShortText());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " qw12312"));
BusinessCacheUtil.set1078PushDevice("");
if (SendCommandUtil.getService() != null) {
// 1078上线后, 停止原推流线程, 重新启动线程发送推流命令, 避免
SendCommandUtil.getService().restartSendToDevice();
}
}
@Override
@ -72,11 +84,12 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
String channelId = ctx.channel().id().asShortText();
// 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
users.remove(ctx.channel());
log.info(BusinessConstant.LOGGER_PREFIX + " : websocket连接 : 客户端掉线 : channelId = {} ", channelId);
BusinessCacheUtil.set1078PushDevice("");
if (SendCommandUtil.getService() != null) {
SendCommandUtil.getService().sendToDevice();
}
log.info("客户端被移除,channelId为:" + channelId);
}
@Override

@ -3,6 +3,8 @@ package org.yzh.web.controller;
import io.github.yezhihao.netmc.session.Session;
import io.github.yezhihao.netmc.session.SessionManager;
import io.swagger.v3.oas.annotations.Operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@ -35,6 +37,8 @@ public class JT1078Controller {
@Autowired
private SendCommandService sendCommandService;
private static final Logger log = LoggerFactory.getLogger(JT1078Controller.class.getSimpleName());
@Operation(summary = "9003 查询终端音视频属性")
@PostMapping("9003")
public Mono<APIResult<T1003>> T9003(@RequestBody JTMessage request) {
@ -44,8 +48,24 @@ public class JT1078Controller {
@Operation(summary = "9101 实时音视频传输请求")
@PostMapping("9101")
public Mono<APIResult<T0001>> T9101(@RequestBody T9101 request) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 9101 实时音视频传输请求 开始执行.");
Mono<APIResult<T0001>> resultMono = null;
APIResult<T0001> result = new APIResult<>();
String clientId = request.getClientId();
int channelNo = request.getChannelNo();
// 若指定设备与当前推流设备相同, 直接返回
if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice()) && channelNo == BusinessCacheUtil.getCurrentPushChannel()) {
log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 {} - {}, 指定设备正在推流.", clientId, channelNo);
result.setMsg("指定设备正在推流");
return Mono.create(sink -> {
sink.success(result);
});
}
log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 - step1 : 当前推流设备 = {} - {} ", BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel());
// 先调用停止推流方法, 然后指定设备推流
if (BusinessCacheUtil.getCurrentPushDevice() != null) {
T9102 t9102 = new T9102();
@ -54,10 +74,21 @@ public class JT1078Controller {
t9102.setCloseType(0);
t9102.setCommand(0);
t9102.setStreamType(0);
log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 - step2 : 停止推流当前设备 : {} - {} ", BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel());
resultMono = messageManager.requestR(t9102, T0001.class);
}
try {
// 添加2s延迟, 避免停止推流与开始推流同时执行
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
SendCommandUtil.setService(sendCommandService);
log.info(BusinessConstant.LOGGER_PREFIX + " : T9101 - step3 : 开始推流 设备 = {} - {} ", request.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO);
sendCommandService.sendToDevice(request.getClientId(), null);
return resultMono;
}
@ -65,8 +96,15 @@ public class JT1078Controller {
@Operation(summary = "9102 音视频实时传输控制")
@PostMapping("9102")
public Mono<APIResult<T0001>> T9102(@RequestBody T9102 request) {
log.info(BusinessConstant.LOGGER_PREFIX + " : T9102 - step1 : 停止推流 ");
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
sendCommandService.sendToDevice();
// 若停止推流设备为正在推流设备, 触发推流方法
log.info(BusinessConstant.LOGGER_PREFIX + " : T9102 - step2 : 当前推流设备 = {} - {} " + BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel());
if (request.getClientId().equals(BusinessCacheUtil.getCurrentPushDevice())) {
log.info(BusinessConstant.LOGGER_PREFIX + " : T9102 - step3 : 调用推流方法 ");
sendCommandService.sendToDevice();
}
return resultMono;
}

@ -9,6 +9,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.yzh.commons.constant.BusinessConstant;
import org.yzh.commons.util.BusinessCacheUtil;
import org.yzh.protocol.basics.JTMessage;
import org.yzh.protocol.commons.JT808;
@ -74,8 +75,11 @@ public class JT808Endpoint {
// 若无正在推流的设备, 调用推流方法
SendCommandUtil.setService(sendCommandService);
if (BusinessCacheUtil.getCurrentPushDevice() == null) {
if (BusinessCacheUtil.getCurrentPushDevice() == null || BusinessCacheUtil.getCurrentPushDevice() == message.getClientId()) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId());
sendCommandService.sendToDevice(message.getClientId(), null);
} else {
log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice());
}
T8100 result = new T8100();
@ -99,8 +103,11 @@ public class JT808Endpoint {
// 若无正在推流的设备, 调用推流方法
SendCommandUtil.setService(sendCommandService);
if (BusinessCacheUtil.getCurrentPushDevice() == null) {
if (BusinessCacheUtil.getCurrentPushDevice() == null || BusinessCacheUtil.getCurrentPushDevice() == message.getClientId()) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 当前指定推流设备 = {} -> 将要推流设备 = {} ", BusinessCacheUtil.getCurrentPushDevice(), message.getClientId());
sendCommandService.sendToDevice(message.getClientId(), null);
} else {
log.info(BusinessConstant.LOGGER_PREFIX + " : 终端鉴权触发 - 发送推流命令 : 已有推流设备, 当前指定推流设备 = {}", BusinessCacheUtil.getCurrentPushDevice());
}
T0001 result = new T0001();

@ -5,6 +5,7 @@ import io.github.yezhihao.netmc.session.SessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.yzh.commons.constant.BusinessConstant;
import org.yzh.commons.model.APIException;
import org.yzh.commons.model.APIResult;
import org.yzh.protocol.basics.JTMessage;
@ -64,17 +65,35 @@ public class MessageManager {
}
public <T> Mono<APIResult<T>> requestR(JTMessage request, Class<T> responseClass) {
// Session session = sessionManager.get(request.getClientId());
// if (session == null)
// return OFFLINE_RESULT;
//
// return session.request(request, responseClass)
// .map(message -> APIResult.ok(message))
// .timeout(Duration.ofSeconds(10), TIMEOUT_RESULT)
// .onErrorResume(e -> {
// log.warn("消息发送失败", e);
// return SENDFAIL_RESULT;
// });
// 修改后代码, 更新日志打印
Session session = sessionManager.get(request.getClientId());
if (session == null)
return OFFLINE_RESULT;
return session.request(request, responseClass)
log.info(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 参数 = {}", request.toString());
Mono<APIResult<T>> resultMono = session.request(request, responseClass)
.map(message -> APIResult.ok(message))
.timeout(Duration.ofSeconds(10), TIMEOUT_RESULT)
.onErrorResume(e -> {
log.warn("消息发送失败", e);
// log.warn("消息发送失败", e);
log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = 消息发送失败. msg : {}", e);
return SENDFAIL_RESULT;
});
log.info(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = {}", resultMono.block().toString());
return resultMono;
}
public <T> Mono<T> request(String sessionId, JTMessage request, Class<T> responseClass, long timeout) {

@ -2,6 +2,7 @@ package org.yzh.web.service;
import io.github.yezhihao.netmc.session.Session;
import io.github.yezhihao.netmc.session.SessionManager;
import io.netty.util.internal.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -27,6 +28,11 @@ public class SendCommandService {
@Autowired
private SessionManager sessionManager;
/**
* 向设备发送推流命令的线程
*/
private Thread sendCommandToDeviceThread;
public void sendToDevice(String clientId) {
if (!"40155591817".equals(clientId)
&& !"12345678901".equals(clientId)
@ -56,37 +62,65 @@ public class SendCommandService {
}
/**
* 自动发送推流命令
* 重启自动发送推流命令
*/
public void restartSendToDevice() {
if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 停止原线程, 重新开启线程 ");
sendCommandToDeviceThread.interrupt();
}
sendToDevice();
}
/**
* 管理当前所有在线设备, 自动发送推流命令, 直到成功一个或全部失败, 通知慢直播平台当前推流车辆及推流状态
*/
public void sendToDevice() {
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 开始执行 ");
if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 已有线程执行推流, 直接返回 ");
return;
}
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING);
Thread thread = new Thread(new Runnable() {
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 启动线程, 开始执行发送推流命令 ");
sendCommandToDeviceThread = new Thread(new Runnable() {
@Override
public void run() {
for (Session session : sessionManager.all()) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 调用推流方法开始 ");
boolean result = sendCommand(session.getClientId(), BusinessConstant.DEFAULT_CHANNEL_NO);
// 只要有一个设备发送推流命令成功, 直接返回, 不继续向其他设备发送推流命令
if (result == true) {
break;
return;
}
}
// 所有在线设备发送命令失败后, 通知慢直播平台当前推流的状态
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE);
}
});
thread.start();
sendCommandToDeviceThread.start();
}
/**
* 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9
* 指定车载机和通道发送推流命令, 若不传channelNo, 默认为9, 通知慢直播平台当前推流车辆及推流状态
*
* @param clientId
* @param channelNo
*/
public boolean sendToDevice(String clientId, Integer channelNo) {
noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING);
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice {} - {} : 开始执行 ", clientId, channelNo);
channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo;
if (StringUtil.isNullOrEmpty(clientId)) {
return false;
}
// 判断该设备是否为在线设备
Session session = sessionManager.get(clientId);
@ -95,6 +129,7 @@ public class SendCommandService {
return false;
}
noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING);
if (!sendCommand(clientId, channelNo)) {
// 若推流命令发送失败, 设备设置为离线
noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE);
@ -115,33 +150,40 @@ public class SendCommandService {
request.setStreamType(0);
request.setTcpPort(4001);
request.setUdpPort(0);
log.info("*** 发送实时音视频传输请求 start. request params : " + request.toString());
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
APIResult<T0001> resultObj = resultMono.block();
log.info("*** 发送实时音视频传输请求 result : " + resultObj.toString());
boolean sendResult = resultObj.isSuccess();
// 若命令发送失败, 返回false
if (!sendResult) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 发送命令失败 ");
return false;
}
// step2: 由于延迟原因, 等待1s后调用jtt1078上传到缓存的推流车辆, 确认设备是否已推流
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 获取1078推流设备, 1s延迟开始 ");
Thread.sleep(1000);
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 获取1078推流设备, 1s延迟结束 ");
// step3: 调用3次, 失败后等待1s, 继续调用3次. 若仍然失败, 视为命令发送失败, 返回false
int times = 0;
while (times < 3) {
// 调用jtt1078流媒体接口
String clientId1078 = BusinessCacheUtil.get1078PushDevice();
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 1078推流设备 = {} ", clientId1078);
// 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环
if (clientId.equals(clientId1078)) {
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 推流设备 = 发送命令的设备, 返回推流成功");
noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_ONLINE);
return true;
}
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 重试1s延迟开始 ");
Thread.sleep(1000);
log.info(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 : 重试1s延迟结束 ");
times++;
}
} catch (InterruptedException e) {
e.printStackTrace();
@ -150,6 +192,14 @@ public class SendCommandService {
return false;
}
/**
* 1. 更新当前推流车辆, 推流通道, 推流状态缓存
* 2. 调用慢直播接口更新推流车辆, 推流状态
*
* @param clientId
* @param channelNo
* @param status
*/
public void noticeCarStatus(String clientId, Integer channelNo, String status) {
channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo;
@ -163,7 +213,7 @@ public class SendCommandService {
Map<String, String> params = new HashMap<>();
params.put("clientId", clientId);
params.put("carStat", status);
log.info(">>> notice to poc-live. url = " + url + " | params = " + params.toString());
log.info(BusinessConstant.LOGGER_PREFIX + " : 调用慢直播服务接口 : 当前推流设备 = {} - {} - {}", clientId, channelNo, status);
HttpClientUtils.doGet(url, params);
}
}

Loading…
Cancel
Save