created separate converter methow for gateway telemetry

This commit is contained in:
YevhenBondarenko 2024-09-13 15:30:37 +02:00
parent 4440e559fe
commit 48b5cadb4a
6 changed files with 67 additions and 46 deletions

View File

@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
@ -176,10 +176,10 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
long publishedTs = System.currentTimeMillis() - 10; long publishedTs = System.currentTimeMillis() - 10;
long gatewayLatencyA = random.nextLong(100, 500); long gatewayLatencyA = random.nextLong(100, 500);
var firstData = new GatewayMetricsData("connectorA", publishedTs - gatewayLatencyA, publishedTs); var firstData = new GatewayMetadata("connectorA", publishedTs - gatewayLatencyA, publishedTs);
gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA);
long gatewayLatencyB = random.nextLong(120, 450); long gatewayLatencyB = random.nextLong(120, 450);
var secondData = new GatewayMetricsData("connectorB", publishedTs - gatewayLatencyB, publishedTs); var secondData = new GatewayMetadata("connectorB", publishedTs - gatewayLatencyB, publishedTs);
gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB);
List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5"); List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4", "key5");

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.transport.mqtt.gateway.metrics; package org.thingsboard.server.common.msg.gateway.metrics;
public record GatewayMetricsData(String connector, long receivedTs, long publishedTs) { public record GatewayMetadata(String connector, long receivedTs, long publishedTs) {
} }

View File

@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
@ -74,33 +76,65 @@ public class JsonConverter {
private static int maxStringValueLength = 0; private static int maxStringValueLength = 0;
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts) throws JsonSyntaxException { public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts) throws JsonSyntaxException {
return convertToTelemetryProto(jsonElement, ts, null);
}
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, long ts, List<JsonElement> metadataResult) throws JsonSyntaxException {
PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder(); PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
convertToTelemetry(jsonElement, ts, null, builder, metadataResult); convertToTelemetry(jsonElement, ts, null, builder);
return builder.build(); return builder.build();
} }
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement, List<JsonElement> metadataResult) throws JsonSyntaxException {
return convertToTelemetryProto(jsonElement, System.currentTimeMillis(), metadataResult);
}
public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException { public static PostTelemetryMsg convertToTelemetryProto(JsonElement jsonElement) throws JsonSyntaxException {
return convertToTelemetryProto(jsonElement, System.currentTimeMillis()); return convertToTelemetryProto(jsonElement, System.currentTimeMillis());
} }
private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder, List<JsonElement> metadataResult) { public static TbPair<TransportProtos.PostTelemetryMsg, List<GatewayMetadata>> convertToGatewayTelemetry(JsonElement jsonElement, long systemTs) {
List<GatewayMetadata> metadataResult = null;
PostTelemetryMsg.Builder builder = PostTelemetryMsg.newBuilder();
if (jsonElement.isJsonArray()) {
var ja = jsonElement.getAsJsonArray();
for (int i = 0; i < ja.size(); i++) {
var je = ja.get(i);
if (je.isJsonObject()) {
JsonObject jo = je.getAsJsonObject();
JsonElement metadataElem = jo.remove("metadata");
if (metadataElem != null) {
if (metadataResult == null) {
metadataResult = new ArrayList<>();
}
if (metadataElem.isJsonObject()) {
JsonObject metadataObj = metadataElem.getAsJsonObject();
var connector = getAndValidateMetadataElement(metadataObj, "connector").getAsString();
var receivedTs = getAndValidateMetadataElement(metadataObj, "receivedTs").getAsLong();
var publishedTs = getAndValidateMetadataElement(metadataObj, "publishedTs").getAsLong();
metadataResult.add(new GatewayMetadata(connector, receivedTs, publishedTs));
} else {
throw new JsonSyntaxException("Can't parse gateway metadata: " + metadataElem);
}
}
parseObject(systemTs, null, builder, jo);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
}
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + jsonElement);
}
return TbPair.of(builder.build(), metadataResult);
}
private static JsonElement getAndValidateMetadataElement(JsonObject metadata, String elementName) {
var element = metadata.get(elementName);
if (element == null || element.isJsonNull()) {
throw new JsonSyntaxException(String.format("Can't parse gateway element in metadata: [%s][%s]", metadata, elementName));
}
return element;
}
private static void convertToTelemetry(JsonElement jsonElement, long systemTs, Map<Long, List<KvEntry>> result, PostTelemetryMsg.Builder builder) {
if (jsonElement.isJsonObject()) { if (jsonElement.isJsonObject()) {
parseObject(systemTs, result, builder, jsonElement.getAsJsonObject()); parseObject(systemTs, result, builder, jsonElement.getAsJsonObject());
} else if (jsonElement.isJsonArray()) { } else if (jsonElement.isJsonArray()) {
jsonElement.getAsJsonArray().forEach(je -> { jsonElement.getAsJsonArray().forEach(je -> {
if (je.isJsonObject()) { if (je.isJsonObject()) {
JsonObject jo = je.getAsJsonObject(); JsonObject jo = je.getAsJsonObject();
if (metadataResult != null && jo.has("metadata")) {
metadataResult.add(jo.get("metadata"));
}
parseObject(systemTs, result, builder, jo); parseObject(systemTs, result, builder, jo);
} else { } else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je); throw new JsonSyntaxException(CAN_T_PARSE_VALUE + je);
@ -562,7 +596,7 @@ public class JsonConverter {
public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws public static Map<Long, List<KvEntry>> convertToTelemetry(JsonElement jsonElement, long systemTs, boolean sorted) throws
JsonSyntaxException { JsonSyntaxException {
Map<Long, List<KvEntry>> result = sorted ? new TreeMap<>() : new HashMap<>(); Map<Long, List<KvEntry>> result = sorted ? new TreeMap<>() : new HashMap<>();
convertToTelemetry(jsonElement, systemTs, result, null, null); convertToTelemetry(jsonElement, systemTs, result, null);
return result; return result;
} }

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent; import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData; import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState;
import java.util.List; import java.util.List;
@ -58,7 +58,7 @@ public class GatewayMetricsService {
scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS);
} }
public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List<GatewayMetricsData> data, long serverReceiveTs) { public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, List<GatewayMetadata> data, long serverReceiveTs) {
states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(data, serverReceiveTs); states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(data, serverReceiveTs);
} }

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt.gateway.metrics; package org.thingsboard.server.transport.mqtt.gateway.metrics;
import lombok.Getter; import lombok.Getter;
import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashMap; import java.util.HashMap;
@ -44,7 +45,7 @@ public class GatewayMetricsState {
this.sessionInfo = sessionInfo; this.sessionInfo = sessionInfo;
} }
public void update(List<GatewayMetricsData> metricsData, long serverReceiveTs) { public void update(List<GatewayMetadata> metricsData, long serverReceiveTs) {
updateLock.lock(); updateLock.lock();
try { try {
metricsData.forEach(data -> { metricsData.forEach(data -> {
@ -87,7 +88,7 @@ public class GatewayMetricsState {
this.transportLatencySum = new AtomicLong(0); this.transportLatencySum = new AtomicLong(0);
} }
private void update(GatewayMetricsData metricsData, long serverReceiveTs) { private void update(GatewayMetadata metricsData, long serverReceiveTs) {
long gwLatency = metricsData.publishedTs() - metricsData.receivedTs(); long gwLatency = metricsData.publishedTs() - metricsData.receivedTs();
long transportLatency = serverReceiveTs - metricsData.publishedTs(); long transportLatency = serverReceiveTs - metricsData.publishedTs();
count.incrementAndGet(); count.incrementAndGet();

View File

@ -48,6 +48,8 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.gateway.metrics.GatewayMetadata;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportServiceCallback;
@ -63,7 +65,6 @@ import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData;
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState; import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugConnectionState;
import java.util.ArrayList; import java.util.ArrayList;
@ -384,9 +385,13 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) { private void processPostTelemetryMsg(T deviceCtx, JsonElement msg, String deviceName, int msgId) {
try { try {
List<JsonElement> metadata = new ArrayList<>(); long systemTs = System.currentTimeMillis();
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(msg.getAsJsonArray(), metadata); TbPair<TransportProtos.PostTelemetryMsg, List<GatewayMetadata>> gatewayPayloadPair = JsonConverter.convertToGatewayTelemetry(msg.getAsJsonArray(), systemTs);
processTelemetryMetadataMsg(metadata); TransportProtos.PostTelemetryMsg postTelemetryMsg = gatewayPayloadPair.getFirst();
List<GatewayMetadata> metadata = gatewayPayloadPair.getSecond();
if (!CollectionUtils.isEmpty(metadata)) {
gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metadata, systemTs);
}
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg)); transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e); log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
@ -394,25 +399,6 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
} }
} }
private void processTelemetryMetadataMsg(List<JsonElement> metadata) {
var serverReceiveTs = System.currentTimeMillis();
var metricsData = metadata.stream()
.filter(JsonElement::isJsonObject)
.map(JsonElement::getAsJsonObject)
.filter(jo -> jo.has("connector")
&& jo.has("receivedTs")
&& jo.has("publishedTs"))
.map(jo -> {
var connector = jo.get("connector").getAsString();
var receivedTs = jo.get("receivedTs").getAsLong();
var publishedTs = jo.get("publishedTs").getAsLong();
return new GatewayMetricsData(connector, receivedTs, publishedTs);
}).toList();
if (!metricsData.isEmpty()) {
gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), metricsData, serverReceiveTs);
}
}
protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException { protected void onDeviceTelemetryProto(int msgId, ByteBuf payload) throws AdaptorException {
try { try {
TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload)); TransportApiProtos.GatewayTelemetryMsg telemetryMsgProto = TransportApiProtos.GatewayTelemetryMsg.parseFrom(getBytes(payload));