minor refactoring due to comments
This commit is contained in:
parent
914d4b8ca3
commit
e65fc9d55e
@ -57,7 +57,7 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC
|
|||||||
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC;
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC;
|
||||||
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_METRICS_TOPIC;
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_METRICS_TOPIC;
|
||||||
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
|
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
|
||||||
import static org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService.METRICS_CHECK;
|
import static org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService.GATEWAY_METRICS;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqttIntegrationTest {
|
public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqttIntegrationTest {
|
||||||
@ -148,13 +148,13 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
|
|||||||
|
|
||||||
gatewayMetricsService.reportMetrics();
|
gatewayMetricsService.reportMetrics();
|
||||||
|
|
||||||
List<String> actualKeys = getActualKeysList(savedGateway.getId(), List.of(METRICS_CHECK));
|
List<String> actualKeys = getActualKeysList(savedGateway.getId(), List.of(GATEWAY_METRICS));
|
||||||
assertEquals(METRICS_CHECK, actualKeys.get(0));
|
assertEquals(GATEWAY_METRICS, actualKeys.get(0));
|
||||||
|
|
||||||
String telemetryUrl = String.format("/api/plugins/telemetry/DEVICE/%s/values/timeseries?startTs=%d&endTs=%d&keys=%s", savedGateway.getId(), 0, System.currentTimeMillis(), METRICS_CHECK);
|
String telemetryUrl = String.format("/api/plugins/telemetry/DEVICE/%s/values/timeseries?startTs=%d&endTs=%d&keys=%s", savedGateway.getId(), 0, System.currentTimeMillis(), GATEWAY_METRICS);
|
||||||
|
|
||||||
Map<String, List<Map<String, Object>>> gatewayTelemetry = doGetAsyncTyped(telemetryUrl, new TypeReference<>() {});
|
Map<String, List<Map<String, Object>>> gatewayTelemetry = doGetAsyncTyped(telemetryUrl, new TypeReference<>() {});
|
||||||
Map<String, Object> latencyCheckTelemetry = gatewayTelemetry.get(METRICS_CHECK).get(0);
|
Map<String, Object> latencyCheckTelemetry = gatewayTelemetry.get(GATEWAY_METRICS).get(0);
|
||||||
Map<String, GatewayMetricsState.ConnectorMetricsResult> latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {});
|
Map<String, GatewayMetricsState.ConnectorMetricsResult> latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {});
|
||||||
assertNotNull(latencyCheckValue);
|
assertNotNull(latencyCheckValue);
|
||||||
|
|
||||||
|
|||||||
@ -424,7 +424,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
|||||||
gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
|
gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
|
||||||
break;
|
break;
|
||||||
case MqttTopics.GATEWAY_METRICS_TOPIC:
|
case MqttTopics.GATEWAY_METRICS_TOPIC:
|
||||||
gatewaySessionHandler.onGatewayLatency(mqttMsg);
|
gatewaySessionHandler.onGatewayMetrics(mqttMsg);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID);
|
ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID);
|
||||||
|
|||||||
@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
@TbMqttTransportComponent
|
@TbMqttTransportComponent
|
||||||
public class GatewayMetricsService {
|
public class GatewayMetricsService {
|
||||||
|
|
||||||
public static final String METRICS_CHECK = "metricsCheck";
|
public static final String GATEWAY_METRICS = "gatewayMetrics";
|
||||||
|
|
||||||
@Value("${transport.mqtt.gateway_metrics_report_interval_sec:3600}")
|
@Value("${transport.mqtt.gateway_metrics_report_interval_sec:3600}")
|
||||||
private int metricsReportIntervalSec;
|
private int metricsReportIntervalSec;
|
||||||
@ -100,7 +100,7 @@ public class GatewayMetricsService {
|
|||||||
}
|
}
|
||||||
var result = state.getStateResult();
|
var result = state.getStateResult();
|
||||||
var kvProto = TransportProtos.KeyValueProto.newBuilder()
|
var kvProto = TransportProtos.KeyValueProto.newBuilder()
|
||||||
.setKey(METRICS_CHECK)
|
.setKey(GATEWAY_METRICS)
|
||||||
.setType(TransportProtos.KeyValueType.JSON_V)
|
.setType(TransportProtos.KeyValueType.JSON_V)
|
||||||
.setJsonV(JacksonUtil.toString(result))
|
.setJsonV(JacksonUtil.toString(result))
|
||||||
.build();
|
.build();
|
||||||
|
|||||||
@ -84,7 +84,7 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler<Gateway
|
|||||||
gatewayMetricsService.onDeviceDelete(deviceId);
|
gatewayMetricsService.onDeviceDelete(deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onGatewayLatency(MqttPublishMessage mqttMsg) throws AdaptorException {
|
public void onGatewayMetrics(MqttPublishMessage mqttMsg) throws AdaptorException {
|
||||||
int msgId = getMsgId(mqttMsg);
|
int msgId = getMsgId(mqttMsg);
|
||||||
ByteBuf payloadData = mqttMsg.payload();
|
ByteBuf payloadData = mqttMsg.payload();
|
||||||
String payload = payloadData.toString(UTF8);
|
String payload = payloadData.toString(UTF8);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user