代码优化

master
liuqingkun 3 years ago
parent 1e5d1485c3
commit fe5c63f706
  1. 1
      jtt808-server/src/main/java/org/yzh/web/controller/JT1078Controller.java
  2. 29
      jtt808-server/src/main/java/org/yzh/web/endpoint/MessageManager.java
  3. 16
      jtt808-server/src/main/java/org/yzh/web/service/SendCommandService.java

@ -65,7 +65,6 @@ public class JT1078Controller {
});
}
log.warn(BusinessConstant.LOGGER_PREFIX + " : T9101 - step1 : 当前推流设备 = {} - {} ", BusinessCacheUtil.getCurrentPushDevice(), BusinessCacheUtil.getCurrentPushChannel());
// 先调用停止推流方法, 然后指定设备推流
if (StringUtil.isNullOrEmpty(BusinessCacheUtil.getCurrentPushDevice())) {

@ -6,9 +6,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.yzh.commons.constant.BusinessConstant;
import org.yzh.commons.model.APICodes;
import org.yzh.commons.model.APIException;
import org.yzh.commons.model.APIResult;
import org.yzh.protocol.basics.JTMessage;
import org.yzh.protocol.t808.T0001;
import reactor.core.publisher.Mono;
import java.time.Duration;
@ -96,6 +98,33 @@ public class MessageManager {
return resultMono;
}
public APIResult<T0001> requestRV2(JTMessage request) {
// 修改后代码, 更新日志打印
Session session = sessionManager.get(request.getClientId());
if (session == null) {
T0001 t0001 = new T0001();
APIResult<T0001> result = new APIResult<>(t0001);
result.setCode(APICodes.OperationFailed.getCode());
result.setMsg("指定设备正在推流");
return result;
}
log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 参数 = {}", request.toString());
Mono<APIResult<T0001>> resultMono = session.request(request, T0001.class)
.map(message -> APIResult.ok(message))
.timeout(Duration.ofSeconds(10), TIMEOUT_RESULT)
.onErrorResume(e -> {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = 消息发送失败. msg : {}", e);
return SENDFAIL_RESULT;
});
// block方法调用>1次后, 会出现重复发送命令的情况, 使用时需注意
APIResult<T0001> result = resultMono.block();
log.warn(BusinessConstant.LOGGER_PREFIX + " : 发送推流命令 : 结果 = {}", result.toString());
return result;
}
public <T> Mono<T> request(String sessionId, JTMessage request, Class<T> responseClass, long timeout) {
return request(sessionId, request, responseClass).timeout(Duration.ofMillis(timeout));
}

@ -2,7 +2,6 @@ package org.yzh.web.service;
import io.github.yezhihao.netmc.session.Session;
import io.github.yezhihao.netmc.session.SessionManager;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,7 +31,7 @@ public class SendCommandService {
/**
* 向设备发送推流命令的线程
*/
private Thread sendCommandToDeviceThread;
private Thread thread;
public void sendToDevice(String clientId) {
if (!"40155591817".equals(clientId)
@ -66,9 +65,9 @@ public class SendCommandService {
* 重启自动发送推流命令
*/
public void restartSendToDevice() {
if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) {
if (thread != null && thread.isAlive()) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice : 停止原线程, 重新开启线程 ");
sendCommandToDeviceThread.interrupt();
thread.interrupt();
}
sendToDevice();
@ -87,7 +86,7 @@ public class SendCommandService {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 开始执行 ");
if (sendCommandToDeviceThread != null && sendCommandToDeviceThread.isAlive()) {
if (thread != null && thread.isAlive()) {
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 已有线程执行推流, 直接返回 ");
return;
}
@ -95,7 +94,7 @@ public class SendCommandService {
noticeCarStatus(null, BusinessConstant.DEFAULT_CHANNEL_NO, BusinessConstant.LIVE_STATUS_WAITING);
log.warn(BusinessConstant.LOGGER_PREFIX + " : 自定义推流方法 sendToDevice() : 启动线程, 开始执行发送推流命令 ");
sendCommandToDeviceThread = new Thread(new Runnable() {
thread = new Thread(new Runnable() {
@Override
public void run() {
for (Session session : sessionManager.all()) {
@ -113,7 +112,7 @@ public class SendCommandService {
}
});
sendCommandToDeviceThread.start();
thread.start();
}
/**
@ -158,8 +157,7 @@ public class SendCommandService {
request.setStreamType(0);
request.setTcpPort(4001);
request.setUdpPort(0);
Mono<APIResult<T0001>> resultMono = messageManager.requestR(request, T0001.class);
APIResult<T0001> resultObj = resultMono.block();
APIResult<T0001> resultObj = messageManager.requestRV2(request);
boolean sendResult = resultObj.isSuccess();
// 若命令发送失败, 返回false

Loading…
Cancel
Save