调用慢直播接口, 及日志更新

master
liuqingkun 3 years ago
parent 9cb8c4832e
commit fd39453532
  1. 2
      commons/src/main/java/org/yzh/commons/constant/BusinessConstant.java
  2. 10
      commons/src/main/java/org/yzh/commons/model/APIResult.java
  3. 21
      commons/src/main/java/org/yzh/commons/util/BusinessCacheUtil.java
  4. 12
      jtt808-server/src/main/java/io/github/yezhihao/netmc/handler/TCPMessageAdapter.java
  5. 27
      jtt808-server/src/main/java/io/github/yezhihao/netmc/websocket/WsChannelInboundHandler.java
  6. 45
      jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java
  7. 1
      jtt808-server/src/main/java/org/yzh/web/controller/OtherController.java
  8. 6
      jtt808-server/src/main/java/org/yzh/web/endpoint/JT808Endpoint.java
  9. 44
      jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java
  10. 2
      jtt808-server/src/main/resources/log4j2.xml

@ -6,4 +6,6 @@ public interface BusinessConstant {
String LIVE_STATUS_ONLINE = "online";
String LIVE_STATUS_WAITING = "waiting";
String LIVE_STATUS_OFFLINE = "offline";
String LIVE_SERVER_UPDATE_PUSH_CAR = "http://42.192.165.208:8002/blade-business/common-api/set-push-car-and-stat";
}

@ -152,4 +152,14 @@ public class APIResult<T> {
throw new UnsupportedOperationException();
}
}
@Override
public String toString() {
return "APIResult{" +
"code=" + code +
", msg='" + msg + '\'' +
", detailMsg='" + detailMsg + '\'' +
", data=" + data +
'}';
}
}

@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class BusinessCacheUtil {
private static Map<String, String> businessCache = new ConcurrentHashMap<>();
private static String CACHE_KEY_CURRENT_PUSH_DEVICE = "current-push-device";
private static String CACHE_KEY_CURRENT_PUSH_CHANNEL = "current-push-channel";
private static String CACHE_KEY_1078_PUSH_DEVICE = "1078-push-device";
private static String CACHE_KEY_PUSH_STATUS = "push-status";
@ -31,6 +32,26 @@ public class BusinessCacheUtil {
businessCache.put(CACHE_KEY_CURRENT_PUSH_DEVICE, deviceId);
}
/**
* 获取当前推流车辆的clientID
*
* @return
*/
public static int getCurrentPushChannel() {
if (businessCache.containsKey(CACHE_KEY_CURRENT_PUSH_CHANNEL)) {
return Integer.parseInt(businessCache.get(CACHE_KEY_CURRENT_PUSH_CHANNEL));
}
return BusinessConstant.DEFAULT_CHANNEL_NO;
}
public static void setCurrentPushChannel(Integer channel) {
if (channel == null) {
channel = BusinessConstant.DEFAULT_CHANNEL_NO;
}
businessCache.put(CACHE_KEY_CURRENT_PUSH_CHANNEL, channel.toString());
}
/**
* 获取当前推流车辆的clientID
*

@ -12,6 +12,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yzh.commons.util.BusinessCacheUtil;
import org.yzh.web.util.SendCommandUtil;
import java.io.IOException;
@ -67,13 +68,12 @@ public class TCPMessageAdapter extends ChannelInboundHandlerAdapter {
session.invalidate();
}
// 若掉线设备为当前推流设备, 调用推流方法, 发送推流
// if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice())) {
// SendCommandUtil.getService().sendToDevice();
// }
SendCommandUtil.getService().sendToDevice();
log.info(">>>>> Disconnected clientId = {} , address = {}", clientId, client(ctx));
log.info(">>>>> Disconnected{}", client(ctx));
// 若掉线设备为当前推流设备, 调用推流方法, 发送推流
if (clientId.equals(BusinessCacheUtil.getCurrentPushDevice()) && SendCommandUtil.getService() != null) {
SendCommandUtil.getService().sendToDevice();
}
}
@Override

@ -9,9 +9,8 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.yzh.commons.constant.BusinessConstant;
import org.yzh.commons.util.BusinessCacheUtil;
import org.yzh.web.service.SendCommandService;
import org.yzh.web.util.SendCommandUtil;
import java.time.LocalDateTime;
@ -37,9 +36,23 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
Channel currentChannel = ctx.channel();
// 获取客户端传输过来的消息
String content = msg.text();
log.debug("receive content : " + content);
BusinessCacheUtil.set1078PushDevice(content);
currentChannel.writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + content));
log.debug("<<< receive content : " + content);
if (content.contains("-")) {
// 由于1078传过来的信息为(0+clientId+'-'+channelNo), 故此处需要解析
String clientId = content.split("-")[0].substring(1);
String channelNo = content.split("-")[1];
BusinessCacheUtil.set1078PushDevice(clientId);
String curPush = BusinessCacheUtil.getCurrentPushDevice();
if (!clientId.equals(curPush)) {
log.info("<<<<<<<< receive client id :" + clientId + " - " + channelNo);
if (SendCommandUtil.getService() != null) {
SendCommandUtil.getService().noticeCarStatus(clientId, Integer.parseInt(channelNo), BusinessConstant.LIVE_STATUS_ONLINE);
}
}
}
currentChannel.writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " : " + content));
}
@ -60,7 +73,9 @@ public class WsChannelInboundHandler extends SimpleChannelInboundHandler<TextWeb
// 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
users.remove(ctx.channel());
BusinessCacheUtil.set1078PushDevice("");
SendCommandUtil.getService().sendToDevice();
if (SendCommandUtil.getService() != null) {
SendCommandUtil.getService().sendToDevice();
}
log.info("客户端被移除,channelId为:" + channelId);
}

@ -1,18 +1,25 @@
package org.yzh.web.controller;
import io.github.yezhihao.netmc.session.Session;
import io.github.yezhihao.netmc.session.SessionManager;
import io.swagger.v3.oas.annotations.Operation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.yzh.commons.constant.BusinessConstant;
import org.yzh.commons.model.APIResult;
import org.yzh.commons.util.BusinessCacheUtil;
import org.yzh.protocol.basics.JTMessage;
import org.yzh.protocol.commons.JT1078;
import org.yzh.protocol.jsatl12.T9208;
import org.yzh.protocol.t1078.*;
import org.yzh.protocol.t808.T0001;
import org.yzh.web.endpoint.MessageManager;
import org.yzh.web.service.SendCommandService;
import org.yzh.web.util.SendCommandUtil;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
@RestController
@ -22,6 +29,12 @@ public class JT1078Controller {
@Autowired
private MessageManager messageManager;
@Autowired
private SessionManager sessionManager;
@Autowired
private SendCommandService sendCommandService;
@Operation(summary = "9003 查询终端音视频属性")
@PostMapping("9003")
public Mono<APIResult<T1003>> T9003(@RequestBody JTMessage request) {
@ -31,15 +44,45 @@ public class JT1078Controller {
@Operation(summary = "9101 实时音视频传输请求")
@PostMapping("9101")
public Mono<APIResult<T0001>> T9101(@RequestBody T9101 request) {
return messageManager.requestR(request, T0001.class);
Mono<APIResult<T0001>> resultMono = null;
// 先调用停止推流方法, 然后指定设备推流
if (BusinessCacheUtil.getCurrentPushDevice() != null) {
T9102 t9102 = new T9102();
t9102.setChannelNo(BusinessCacheUtil.getCurrentPushChannel());
t9102.setClientId(BusinessCacheUtil.getCurrentPushDevice());
t9102.setCloseType(0);
t9102.setCommand(0);
t9102.setStreamType(0);
resultMono = messageManager.requestR(t9102, T0001.class);
}
SendCommandUtil.setService(sendCommandService);
sendCommandService.sendToDevice(request.getClientId(), null);
return resultMono;
}
@Operation(summary = "9102 音视频实时传输控制")
@PostMapping("9102")
public Mono<APIResult<T0001>> T9102(@RequestBody T9102 request) {
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
sendCommandService.sendToDevice();
return resultMono;
}
@Operation(summary = "9101 实时音视频传输请求")
@PostMapping("9101-v2")
public Mono<APIResult<T0001>> T9101V2(@RequestBody T9101 request) {
return messageManager.requestR(request, T0001.class);
}
@Operation(summary = "9102 音视频实时传输控制")
@PostMapping("9102-v2")
public Mono<APIResult<T0001>> T9102V2(@RequestBody T9102 request) {
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
return resultMono;
}
@Operation(summary = "9201 平台下发远程录像回放请求")
@PostMapping("9201")
public Mono<APIResult<T1205>> T9201(@RequestBody T9201 request) {

@ -141,6 +141,7 @@ public class OtherController {
public APIResult<Map<String, String>> getCurPushDevice() {
Map<String, String> data = new HashMap<>();
data.put("deviceId", BusinessCacheUtil.getCurrentPushDevice());
data.put("channelNo", BusinessCacheUtil.getCurrentPushChannel() + "");
data.put("pushStatus", BusinessCacheUtil.getPushStatus());
APIResult result = new APIResult(APICodes.Success.getCode());

@ -97,6 +97,12 @@ public class JT808Endpoint {
device.setPlateNo(token[1]);
session.setAttribute(SessionKey.Device, device);
// 若无正在推流的设备, 调用推流方法
SendCommandUtil.setService(sendCommandService);
if (BusinessCacheUtil.getCurrentPushDevice() == null) {
sendCommandService.sendToDevice(message.getClientId(), null);
}
T0001 result = new T0001();
result.setResponseSerialNo(message.getSerialNo());
result.setResponseMessageId(message.getMessageId());

@ -59,9 +59,7 @@ public class SendCommandService {
* 自动发送推流命令
*/
public void sendToDevice() {
BusinessCacheUtil.setCurrentPushDevice(null);
BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_WAITING);
noticeCarStatus();
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING);
Thread thread = new Thread(new Runnable() {
@Override
@ -74,9 +72,7 @@ public class SendCommandService {
}
}
BusinessCacheUtil.setCurrentPushDevice(null);
BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_OFFLINE);
noticeCarStatus();
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_OFFLINE);
}
});
@ -90,9 +86,7 @@ public class SendCommandService {
* @param channelNo
*/
public boolean sendToDevice(String clientId, Integer channelNo) {
BusinessCacheUtil.setCurrentPushDevice(clientId);
BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_WAITING);
noticeCarStatus();
noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_WAITING);
// 判断该设备是否为在线设备
Session session = sessionManager.get(clientId);
@ -103,9 +97,7 @@ public class SendCommandService {
if (!sendCommand(clientId, channelNo)) {
// 若推流命令发送失败, 设备设置为离线
BusinessCacheUtil.setCurrentPushDevice(null);
BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_OFFLINE);
noticeCarStatus();
noticeCarStatus(null, channelNo, BusinessConstant.LIVE_STATUS_OFFLINE);
}
return true;
}
@ -113,7 +105,6 @@ public class SendCommandService {
private boolean sendCommand(String clientId, Integer channelNo) {
channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo;
log.info("*********** 自定义发送实时音视频传输请求 start");
try {
// step1: 向车载机发送命令
T9101 request = new T9101();
@ -124,10 +115,12 @@ public class SendCommandService {
request.setStreamType(0);
request.setTcpPort(4001);
request.setUdpPort(0);
log.info("*** 发送实时音视频传输请求 start. request params : " + request.toString());
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
log.info("*********** 自定义发送实时音视频传输请求 t0102 : " + resultMono.block().isSuccess());
APIResult<T0001> resultObj = resultMono.block();
log.info("*** 发送实时音视频传输请求 result : " + resultObj.toString());
boolean sendResult = resultMono.block().isSuccess();
boolean sendResult = resultObj.isSuccess();
// 若命令发送失败, 返回false
if (!sendResult) {
return false;
@ -138,15 +131,13 @@ public class SendCommandService {
// step3: 调用3次, 失败后等待1s, 继续调用3次. 若仍然失败, 视为命令发送失败, 返回false
int times = 0;
while (times < 2) {
while (times < 3) {
// 调用jtt1078流媒体接口
String clientId1078 = BusinessCacheUtil.get1078PushDevice();
// 判断返回的推流设备是否为发送命令的设备, 如果是, 返回true, 否则继续下次循环
if (clientId.equals(clientId1078)) {
BusinessCacheUtil.setCurrentPushDevice(clientId);
BusinessCacheUtil.setPushStatus(BusinessConstant.LIVE_STATUS_ONLINE);
noticeCarStatus();
noticeCarStatus(clientId, channelNo, BusinessConstant.LIVE_STATUS_ONLINE);
return true;
}
@ -159,15 +150,20 @@ public class SendCommandService {
return false;
}
public void noticeCarStatus(String clientId, Integer channelNo, String status) {
channelNo = channelNo == null ? BusinessConstant.DEFAULT_CHANNEL_NO : channelNo;
// 更新推流车辆和推流状态
BusinessCacheUtil.setCurrentPushDevice(clientId);
BusinessCacheUtil.setCurrentPushChannel(channelNo);
BusinessCacheUtil.setPushStatus(status);
public void noticeCarStatus() {
String clientId = BusinessCacheUtil.getCurrentPushDevice();
String status = BusinessCacheUtil.getPushStatus();
// 设备注册后, 调用接口通知给web服务器
String url = "http://127.0.0.1:8002/blade-business/common-api/set-car-stat";
// 调用接口通知给慢直播服务器
String url = BusinessConstant.LIVE_SERVER_UPDATE_PUSH_CAR;
Map<String, String> params = new HashMap<>();
params.put("clientId", clientId);
params.put("carStat", status);
log.info(">>> notice to poc-live. url = " + url + " | params = " + params.toString());
HttpClientUtils.doGet(url, params);
}
}

@ -28,7 +28,7 @@
<Loggers>
<logger name="io.github.yezhihao.netmc" level="info" additivity="false">
<AppenderRef ref="AccessLog"/>
<AppenderRef ref="AppLog"/>
<AppenderRef ref="Console"/>
</logger>

Loading…
Cancel
Save