implemented telemetry metadata instead of separate topic for metrics

This commit is contained in:
YevhenBondarenko 2024-09-12 22:28:10 +02:00
parent e65fc9d55e
commit 137b1732bb
11 changed files with 91 additions and 95 deletions

View File

@ -1021,7 +1021,7 @@ transport:
disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism
# Interval of periodic report of the gateway metrics # Interval of periodic report of the gateway metrics
gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}" gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:60}"
netty: netty:
# Netty leak detector level # Netty leak detector level
leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}" leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"

View File

@ -18,7 +18,6 @@ package org.thingsboard.server.transport.mqtt.mqttv3.telemetry.timeseries;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -55,7 +54,6 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_METRICS_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
import static org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService.GATEWAY_METRICS; import static org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService.GATEWAY_METRICS;
@ -124,8 +122,8 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
String deviceName = "Device A"; String deviceName = "Device A";
Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class), Device device = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class),
20, 20,
100); 100);
assertNotNull(device); assertNotNull(device);
client.disconnect(); client.disconnect();
@ -138,13 +136,10 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
.build(); .build();
processBeforeTest(configProperties); processBeforeTest(configProperties);
MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken);
Map<String, List<Long>> gwLatencies = new HashMap<>(); Map<String, List<Long>> gwLatencies = new HashMap<>();
Map<String, List<Long>> transportLatencies = new HashMap<>(); Map<String, List<Long>> transportLatencies = new HashMap<>();
publishLatency(client, gwLatencies, transportLatencies, 5); publishLatency(gwLatencies, transportLatencies, 5);
gatewayMetricsService.reportMetrics(); gatewayMetricsService.reportMetrics();
@ -174,38 +169,37 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
assertNotNull(connectorLatencyResult); assertNotNull(connectorLatencyResult);
checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency); checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency);
}); });
client.disconnect();
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> verify(gatewayMetricsService).onDeviceDisconnect(savedGateway.getId()));
} }
private void publishLatency(MqttTestClient client, Map<String, List<Long>> gwLatencies, Map<String, List<Long>> transportLatencies, int n) throws Exception { private void publishLatency(Map<String, List<Long>> gwLatencies, Map<String, List<Long>> transportLatencies, int n) throws Exception {
Random random = new Random(); Random random = new Random();
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
Map<String, GatewayMetricsData> data = new HashMap<>();
long publishedTs = System.currentTimeMillis() - 10; long publishedTs = System.currentTimeMillis() - 10;
long gatewayLatencyA = random.nextLong(100, 500); long gatewayLatencyA = random.nextLong(100, 500);
data.put("connectorA", new GatewayMetricsData(publishedTs - gatewayLatencyA, publishedTs)); var firstData = new GatewayMetricsData("connectorA", publishedTs - gatewayLatencyA, publishedTs);
gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA);
boolean sendB = i % 2 == 0; long gatewayLatencyB = random.nextLong(120, 450);
if (sendB) { var secondData = new GatewayMetricsData("connectorB", publishedTs - gatewayLatencyB, publishedTs);
long gatewayLatencyB = random.nextLong(120, 450); gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB);
data.put("connectorB", new GatewayMetricsData(publishedTs - gatewayLatencyB, publishedTs));
gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB);
}
client.publishAndWait(GATEWAY_METRICS_TOPIC, JacksonUtil.writeValueAsBytes(data)); List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");
ArgumentCaptor<Long> transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class); String deviceName1 = "Device A";
verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture()); String deviceName2 = "Device B";
Long transportReceiveTs = transportReceiveTsCaptor.getValue(); String firstMetadata = JacksonUtil.writeValueAsString(firstData);
Long transportLatency = transportReceiveTs - publishedTs; String secondMetadata = JacksonUtil.writeValueAsString(secondData);
transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatency); String payload = getGatewayTelemetryJsonPayloadWithMetadata(deviceName1, deviceName2, "10000", "20000", firstMetadata, secondMetadata);
if (sendB) { processGatewayTelemetryTest(GATEWAY_TELEMETRY_TOPIC, expectedKeys, payload.getBytes(), deviceName1, deviceName2);
transportLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(transportLatency);
} ArgumentCaptor<Long> transportReceiveTsCaptorA = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> transportReceiveTsCaptorB = ArgumentCaptor.forClass(Long.class);
verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(List.of(firstData)), transportReceiveTsCaptorA.capture());
verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(List.of(secondData)), transportReceiveTsCaptorB.capture());
Long transportReceiveTsA = transportReceiveTsCaptorA.getValue();
Long transportReceiveTsB = transportReceiveTsCaptorB.getValue();
Long transportLatencyA = transportReceiveTsA - publishedTs;
Long transportLatencyB = transportReceiveTsB - publishedTs;
transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatencyA);
transportLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(transportLatencyB);
} }
} }
@ -250,7 +244,8 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
long end = System.currentTimeMillis() + 5000; long end = System.currentTimeMillis() + 5000;
Map<String, List<Map<String, Object>>> values = null; Map<String, List<Map<String, Object>>> values = null;
while (start <= end) { while (start <= end) {
values = doGetAsyncTyped(getTelemetryValuesUrl, new TypeReference<>() {}); values = doGetAsyncTyped(getTelemetryValuesUrl, new TypeReference<>() {
});
boolean valid = values.size() == expectedKeys.size(); boolean valid = values.size() == expectedKeys.size();
if (valid) { if (valid) {
for (String key : expectedKeys) { for (String key : expectedKeys) {
@ -289,7 +284,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
MqttTestClient client = new MqttTestClient(); MqttTestClient client = new MqttTestClient();
client.connectAndWait(gatewayAccessToken); client.connectAndWait(gatewayAccessToken);
client.publishAndWait(topic, payload); client.publishAndWait(topic, payload);
client.disconnect(); client.disconnectAndWait();
Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class), Device firstDevice = doExecuteWithRetriesAndInterval(() -> doGet("/api/tenant/devices?deviceName=" + firstDeviceName, Device.class),
20, 20,
@ -346,6 +341,16 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
return "{\"" + deviceA + "\": " + payload + ", \"" + deviceB + "\": " + payload + "}"; return "{\"" + deviceA + "\": " + payload + ", \"" + deviceB + "\": " + payload + "}";
} }
protected String getGatewayTelemetryJsonPayloadWithMetadata(String deviceA, String deviceB, String firstTsValue, String secondTsValue, String firstMetadata, String secondMetadata) {
String payloadA = "[{\"ts\": " + firstTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + ", \"metadata\":" + firstMetadata + "}, " +
"{\"ts\": " + secondTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + "}]";
String payloadB = "[{\"ts\": " + firstTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + ", \"metadata\":" + secondMetadata + "}, " +
"{\"ts\": " + secondTsValue + ", \"values\": " + PAYLOAD_VALUES_STR + "}]";
return "{\"" + deviceA + "\": " + payloadA + ", \"" + deviceB + "\": " + payloadB + "}";
}
private String getTelemetryValuesUrl(DeviceId deviceId, Set<String> actualKeySet) { private String getTelemetryValuesUrl(DeviceId deviceId, Set<String> actualKeySet) {
return "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?startTs=0&endTs=25000&keys=" + String.join(",", actualKeySet); return "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?startTs=0&endTs=25000&keys=" + String.join(",", actualKeySet);
} }

View File

@ -75,7 +75,6 @@ public class MqttTopics {
public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC; public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC;
public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST; public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST;
public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE; public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE;
public static final String GATEWAY_METRICS_TOPIC = BASE_GATEWAY_API_TOPIC + "/metrics";
// v2 topics // v2 topics
public static final String BASE_DEVICE_API_TOPIC_V2 = "v2"; public static final String BASE_DEVICE_API_TOPIC_V2 = "v2";
public static final String REQUEST_ID_PATTERN = "(?<requestId>\\d+)"; public static final String REQUEST_ID_PATTERN = "(?<requestId>\\d+)";

View File

@ -74,22 +74,34 @@ public class JsonConverter {
private static int maxStringValueLength = 0; private static int maxStringValueLength = 0;
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts) throws JsonSyntaxException { public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts) throws JsonSyntaxException {
return convertToTelemetryProto(jsonElement, ts, null);
}
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts, List<JsonElement> metadataResult) throws JsonSyntaxException {
PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder(); PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
convertToTelemetry(jsonElement, ts, null, builder); convertToTelemetry(jsonElement, ts, null, builder, metadataResult);
return builder.build(); return builder.build();
} }
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, List<JsonElement> metadataResult) throws JsonSyntaxException {
return convertToTelemetryProto(jsonElement, System.currentTimeMillis(), metadataResult);
}
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException { public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException {
return convertToTelemetryProto(jsonElement, System.currentTimeMillis()); return convertToTelemetryProto(jsonElement, System.currentTimeMillis());
} }
private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder) { private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder, List<JsonElement> metadataResult) {
if (jsonElement.isJsonObject()) { if (jsonElement.isJsonObject()) {
parseObject(systemTs, result, builder, jsonElement.getAsJsonObject()); parseObject(systemTs, result, builder, jsonElement.getAsJsonObject());
} else if (jsonElement.isJsonArray()) { } else if (jsonElement.isJsonArray()) {
jsonElement.getAsJsonArray().forEach(je -> { jsonElement.getAsJsonArray().forEach(je -> {
if (je.isJsonObject()) { if (je.isJsonObject()) {
parseObject(systemTs, result, builder, je.getAsJsonObject()); JsonObject jo = je.getAsJsonObject();
if (metadataResult != null && jo.has("metadata")) {
metadataResult.add(jo.get("metadata"));
}
parseObject(systemTs, result, builder, jo);
} else { } else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je); throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
} }
@ -550,7 +562,7 @@ public class JsonConverter {
public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws
JsonSyntaxException { JsonSyntaxException {
Map<Long, List<KvEntry>> result = sorted ? new TreeMap<>() : new HashMap<>(); Map<Long, List<KvEntry>> result = sorted ? new TreeMap<>() : new HashMap<>();
convertToTelemetry(jsonElement, systemTs, result, null); convertToTelemetry(jsonElement, systemTs, result, null, null);
return result; return result;
} }

View File

@ -423,9 +423,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_DISCONNECT_TOPIC: case MqttTopics.GATEWAY_DISCONNECT_TOPIC:
gatewaySessionHandler.onDeviceDisconnect(mqttMsg); gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
break; break;
case MqttTopics.GATEWAY_METRICS_TOPIC:
gatewaySessionHandler.onGatewayMetrics(mqttMsg);
break;
default: default:
ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID); ack(ctx, msgId, MqttReasonCodes.PubAck.TOPIC_NAME_INVALID);
} }
@ -1199,7 +1196,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
if (gatewaySessionHandler != null) { if (gatewaySessionHandler != null) {
gatewaySessionHandler.onGatewayDisconnect(); gatewaySessionHandler.onDevicesDisconnect();
} }
if (sparkplugSessionHandler != null) { if (sparkplugSessionHandler != null) {
// add Msg Telemetry node: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp() // add Msg Telemetry node: key STATE type: String value: OFFLINE ts: sparkplugBProto.getTimestamp()

View File

@ -30,6 +30,7 @@ import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,7 +42,7 @@ public class GatewayMetricsService {
public static final String GATEWAY_METRICS = "gatewayMetrics"; public static final String GATEWAY_METRICS = "gatewayMetrics";
@Value("${transport.mqtt.gateway_metrics_report_interval_sec:3600}") @Value("${transport.mqtt.gateway_metrics_report_interval_sec:60}")
private int metricsReportIntervalSec; private int metricsReportIntervalSec;
@Autowired @Autowired
@ -57,8 +58,8 @@ public class GatewayMetricsService {
scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS);
} }
public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map<String, GatewayMetricsData> data, long ts) { public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List<GatewayMetricsData> data, long serverReceiveTs) {
states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(ts, data); states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(data, serverReceiveTs);
} }
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId) { public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId) {
@ -72,26 +73,18 @@ public class GatewayMetricsService {
states.remove(deviceId); states.remove(deviceId);
} }
public void onDeviceDisconnect(DeviceId deviceId) {
GatewayMetricsState state = states.remove(deviceId);
if (state != null) {
reportMetrics(state, System.currentTimeMillis());
}
}
public void reportMetrics() { public void reportMetrics() {
if (states.isEmpty()) { if (states.isEmpty()) {
return; return;
} }
Map<DeviceId, GatewayMetricsState> oldStates = states; Map<DeviceId, GatewayMetricsState> statesToReport = states;
states = new ConcurrentHashMap<>(); states = new ConcurrentHashMap<>();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
oldStates.forEach((gatewayId, state) -> { statesToReport.forEach((gatewayId, state) -> {
reportMetrics(state, ts); reportMetrics(state, ts);
}); });
oldStates.clear();
} }
private void reportMetrics(GatewayMetricsState state, long ts) { private void reportMetrics(GatewayMetricsState state, long ts) {

View File

@ -15,5 +15,5 @@
*/ */
package org.thingsboard.server.transport.mqtt.gateway.metrics; package org.thingsboard.server.transport.mqtt.gateway.metrics;
public record GatewayMetricsData(long receivedTs, long publishedTs) { public record GatewayMetricsData(String connector, long receivedTs, long publishedTs) {
} }

View File

@ -19,6 +19,7 @@ import lombok.Getter;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -43,11 +44,11 @@ public class GatewayMetricsState {
this.sessionInfo = sessionInfo; this.sessionInfo = sessionInfo;
} }
public void update(long ts, Map<String, GatewayMetricsData> metricsData) { public void update(List<GatewayMetricsData> metricsData, long serverReceiveTs) {
updateLock.lock(); updateLock.lock();
try { try {
metricsData.forEach((connectorName, data) -> { metricsData.forEach(data -> {
connectors.computeIfAbsent(connectorName, k -> new ConnectorMetricsState()).update(ts, data); connectors.computeIfAbsent(data.connector(), k -> new ConnectorMetricsState()).update(data, serverReceiveTs);
}); });
} finally { } finally {
updateLock.unlock(); updateLock.unlock();
@ -86,7 +87,7 @@ public class GatewayMetricsState {
this.transportLatencySum = new AtomicLong(0); this.transportLatencySum = new AtomicLong(0);
} }
private void update(long serverReceiveTs, GatewayMetricsData metricsData) { private void update(GatewayMetricsData metricsData, long serverReceiveTs) {
long gwLatency = metricsData.publishedTs() - metricsData.receivedTs(); long gwLatency = metricsData.publishedTs() - metricsData.receivedTs();
long transportLatency = serverReceiveTs - metricsData.publishedTs(); long transportLatency = serverReceiveTs - metricsData.publishedTs();
count.incrementAndGet(); count.incrementAndGet();

View File

@ -62,6 +62,8 @@ import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState;
import java.util.ArrayList; import java.util.ArrayList;
@ -114,6 +116,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
protected final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap; protected final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
protected final ChannelHandlerContext channel; protected final ChannelHandlerContext channel;
protected final DeviceSessionCtx deviceSessionCtx; protected final DeviceSessionCtx deviceSessionCtx;
protected final GatewayMetricsService gatewayMetricsService;
@Getter @Getter
@Setter @Setter
@ -131,6 +134,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap(); this.mqttQoSMap = deviceSessionCtx.getMqttQoSMap();
this.channel = deviceSessionCtx.getChannel(); this.channel = deviceSessionCtx.getChannel();
this.overwriteDevicesActivity = overwriteDevicesActivity; this.overwriteDevicesActivity = overwriteDevicesActivity;
this.gatewayMetricsService = deviceSessionCtx.getContext().getGatewayMetricsService();
} }
ConcurrentReferenceHashMap<String, Lock> createWeakMap() { ConcurrentReferenceHashMap<String, Lock> createWeakMap() {
@ -380,7 +384,9 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) { private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) {
try { try {
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(msg.getAsJsonArray()); List<JsonElement> metadata = new ArrayList<>();
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(msg.getAsJsonArray(), metadata);
processTelemetryMetadataMsg(metadata);
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
@ -388,6 +394,20 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
} }
private void processTelemetryMetadataMsg(List<JsonElement> metadata) {
var serverReceiveTs = System.currentTimeMillis();
var metricsData = metadata.stream()
.filter(JsonElement::isJsonObject)
.map(je -> {
var jo = je.getAsJsonObject();
var connector = jo.get("connector").getAsString();
var receivedTs = jo.get("receivedTs").getAsLong();
var publishedTs = jo.get("publishedTs").getAsLong();
return new GatewayMetricsData(connector, receivedTs, publishedTs);
}).toList();
gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs);
}
protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException { protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException {
try { try {
TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload)); TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload));

View File

@ -15,36 +15,27 @@
*/ */
package org.thingsboard.server.transport.mqtt.session; package org.thingsboard.server.transport.mqtt.session;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.adaptor.AdaptorException;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import static com.amazonaws.util.StringUtils.UTF8;
/** /**
* Created by nickAS21 on 26.12.22 * Created by nickAS21 on 26.12.22
*/ */
@Slf4j @Slf4j
public class GatewaySessionHandler extends AbstractGatewaySessionHandler<GatewayDeviceSessionContext> { public class GatewaySessionHandler extends AbstractGatewaySessionHandler<GatewayDeviceSessionContext> {
private final GatewayMetricsService gatewayMetricsService;
public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) { public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) {
super(deviceSessionCtx, sessionId, overwriteDevicesActivity); super(deviceSessionCtx, sessionId, overwriteDevicesActivity);
this.gatewayMetricsService = deviceSessionCtx.getContext().getGatewayMetricsService();
} }
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
@ -70,11 +61,6 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler<Gateway
return new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService); return new GatewayDeviceSessionContext(this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
} }
public void onGatewayDisconnect() {
this.onDevicesDisconnect();
gatewayMetricsService.onDeviceDisconnect(gateway.getDeviceId());
}
public void onGatewayUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) { public void onGatewayUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
this.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); this.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
gatewayMetricsService.onDeviceUpdate(sessionInfo, gateway.getDeviceId()); gatewayMetricsService.onDeviceUpdate(sessionInfo, gateway.getDeviceId());
@ -84,21 +70,4 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler<Gateway
gatewayMetricsService.onDeviceDelete(deviceId); gatewayMetricsService.onDeviceDelete(deviceId);
} }
public void onGatewayMetrics(MqttPublishMessage mqttMsg) throws AdaptorException {
int msgId = getMsgId(mqttMsg);
ByteBuf payloadData = mqttMsg.payload();
String payload = payloadData.toString(UTF8);
if (payload == null) {
log.debug("[{}][{}][{}] Payload is empty!", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
throw new AdaptorException(new IllegalArgumentException("Payload is empty!"));
}
long ts = System.currentTimeMillis();
try {
gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), JacksonUtil.fromString(payload, new TypeReference<>() {}), ts);
ack(msgId, MqttReasonCodes.PubAck.SUCCESS);
} catch (Throwable t) {
ackOrClose(msgId);
}
}
} }

View File

@ -147,7 +147,7 @@ transport:
disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism
# Interval of periodic report of the gateway metrics # Interval of periodic report of the gateway metrics
gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}" gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:60}"
netty: netty:
# Netty leak detector level # Netty leak detector level
leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}" leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"