ranemed latency to metrics

This commit is contained in:
YevhenBondarenko 2024-09-12 14:00:40 +02:00
parent dfb633b8e8
commit 914d4b8ca3
10 changed files with 78 additions and 75 deletions

View File

@ -1020,8 +1020,8 @@ transport:
# MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message. # MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message.
disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before the device connected state. This limit works on the low level before TenantProfileLimits mechanism
# Interval of periodic report of the gateway latency # Interval of periodic report of the gateway metrics
gateway_latency_report_interval_sec: "${MQTT_GATEWAY_LATENCY_REPORT_INTERVAL_SEC:3600}" gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}"
netty: netty:
# Netty leak detector level # Netty leak detector level
leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}" leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"

View File

@ -25,13 +25,12 @@ import org.mockito.ArgumentCaptor;
import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.TransportPayloadType;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest; import org.thingsboard.server.transport.mqtt.AbstractMqttIntegrationTest;
import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties; import org.thingsboard.server.transport.mqtt.MqttTestConfigProperties;
import org.thingsboard.server.transport.mqtt.gateway.GatewayLatencyService; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyData; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData;
import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyState; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestCallback;
import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient; import org.thingsboard.server.transport.mqtt.mqttv3.MqttTestClient;
@ -56,9 +55,9 @@ import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVIC
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.DEVICE_TELEMETRY_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_CONNECT_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_LATENCY_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_METRICS_TOPIC;
import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC; import static org.thingsboard.server.common.data.device.profile.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
import static org.thingsboard.server.transport.mqtt.mqttv3.credentials.BasicMqttCredentialsTest.CLIENT_ID; import static org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService.METRICS_CHECK;
@Slf4j @Slf4j
public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqttIntegrationTest { public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqttIntegrationTest {
@ -70,7 +69,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
" \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}"; " \"key5\": {\"someNumber\": 42, \"someArray\": [1,2,3], \"someNestedObject\": {\"key\": \"value\"}}}";
@SpyBean @SpyBean
GatewayLatencyService gatewayLatencyService; GatewayMetricsService gatewayMetricsService;
@Before @Before
public void beforeTest() throws Exception { public void beforeTest() throws Exception {
@ -133,9 +132,9 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
} }
@Test @Test
public void testPushLatencyGateway() throws Exception { public void testPushMetricsGateway() throws Exception {
MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder() MqttTestConfigProperties configProperties = MqttTestConfigProperties.builder()
.gatewayName("Test latency gateway") .gatewayName("Test metrics gateway")
.build(); .build();
processBeforeTest(configProperties); processBeforeTest(configProperties);
@ -147,16 +146,16 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
publishLatency(client, gwLatencies, transportLatencies, 5); publishLatency(client, gwLatencies, transportLatencies, 5);
gatewayLatencyService.reportLatency(); gatewayMetricsService.reportMetrics();
List<String> actualKeys = getActualKeysList(savedGateway.getId(), List.of("latencyCheck")); List<String> actualKeys = getActualKeysList(savedGateway.getId(), List.of(METRICS_CHECK));
assertEquals("latencyCheck", actualKeys.get(0)); assertEquals(METRICS_CHECK, actualKeys.get(0));
String telemetryUrl = String.format("/api/plugins/telemetry/DEVICE/%s/values/timeseries?startTs=%d&endTs=%d&keys=latencyCheck", savedGateway.getId(), 0, System.currentTimeMillis()); String telemetryUrl = String.format("/api/plugins/telemetry/DEVICE/%s/values/timeseries?startTs=%d&endTs=%d&keys=%s", savedGateway.getId(), 0, System.currentTimeMillis(), METRICS_CHECK);
Map<String, List<Map<String, Object>>> gatewayTelemetry = doGetAsyncTyped(telemetryUrl, new TypeReference<>() {}); Map<String, List<Map<String, Object>>> gatewayTelemetry = doGetAsyncTyped(telemetryUrl, new TypeReference<>() {});
Map<String, Object> latencyCheckTelemetry = gatewayTelemetry.get("latencyCheck").get(0); Map<String, Object> latencyCheckTelemetry = gatewayTelemetry.get(METRICS_CHECK).get(0);
Map<String, GatewayLatencyState.ConnectorLatencyResult> latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {}); Map<String, GatewayMetricsState.ConnectorMetricsResult> latencyCheckValue = JacksonUtil.fromString((String) latencyCheckTelemetry.get("value"), new TypeReference<>() {});
assertNotNull(latencyCheckValue); assertNotNull(latencyCheckValue);
gwLatencies.forEach((connectorName, gwLatencyList) -> { gwLatencies.forEach((connectorName, gwLatencyList) -> {
@ -171,7 +170,7 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
long minTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).min().getAsLong(); long minTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).min().getAsLong();
long maxTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).max().getAsLong(); long maxTransportLatency = transportLatencyList.stream().mapToLong(Long::longValue).max().getAsLong();
GatewayLatencyState.ConnectorLatencyResult connectorLatencyResult = latencyCheckValue.get(connectorName); GatewayMetricsState.ConnectorMetricsResult connectorLatencyResult = latencyCheckValue.get(connectorName);
assertNotNull(connectorLatencyResult); assertNotNull(connectorLatencyResult);
checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency); checkConnectorLatencyResult(connectorLatencyResult, avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency);
}); });
@ -180,27 +179,27 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
Awaitility.await() Awaitility.await()
.atMost(5, TimeUnit.SECONDS) .atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> verify(gatewayLatencyService).onDeviceDisconnect(savedGateway.getId())); .untilAsserted(() -> verify(gatewayMetricsService).onDeviceDisconnect(savedGateway.getId()));
} }
private void publishLatency(MqttTestClient client, Map<String, List<Long>> gwLatencies, Map<String, List<Long>> transportLatencies, int n) throws Exception { private void publishLatency(MqttTestClient client, Map<String, List<Long>> gwLatencies, Map<String, List<Long>> transportLatencies, int n) throws Exception {
Random random = new Random(); Random random = new Random();
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
Map<String, GatewayLatencyData> data = new HashMap<>(); Map<String, GatewayMetricsData> data = new HashMap<>();
long publishedTs = System.currentTimeMillis() - 10; long publishedTs = System.currentTimeMillis() - 10;
long gatewayLatencyA = random.nextLong(100, 500); long gatewayLatencyA = random.nextLong(100, 500);
data.put("connectorA", new GatewayLatencyData(publishedTs - gatewayLatencyA, publishedTs)); data.put("connectorA", new GatewayMetricsData(publishedTs - gatewayLatencyA, publishedTs));
gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA); gwLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(gatewayLatencyA);
boolean sendB = i % 2 == 0; boolean sendB = i % 2 == 0;
if (sendB) { if (sendB) {
long gatewayLatencyB = random.nextLong(120, 450); long gatewayLatencyB = random.nextLong(120, 450);
data.put("connectorB", new GatewayLatencyData(publishedTs - gatewayLatencyB, publishedTs)); data.put("connectorB", new GatewayMetricsData(publishedTs - gatewayLatencyB, publishedTs));
gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB); gwLatencies.computeIfAbsent("connectorB", key -> new ArrayList<>()).add(gatewayLatencyB);
} }
client.publishAndWait(GATEWAY_LATENCY_TOPIC, JacksonUtil.writeValueAsBytes(data)); client.publishAndWait(GATEWAY_METRICS_TOPIC, JacksonUtil.writeValueAsBytes(data));
ArgumentCaptor<Long> transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> transportReceiveTsCaptor = ArgumentCaptor.forClass(Long.class);
verify(gatewayLatencyService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture()); verify(gatewayMetricsService).process(any(), eq(savedGateway.getId()), eq(data), transportReceiveTsCaptor.capture());
Long transportReceiveTs = transportReceiveTsCaptor.getValue(); Long transportReceiveTs = transportReceiveTsCaptor.getValue();
Long transportLatency = transportReceiveTs - publishedTs; Long transportLatency = transportReceiveTs - publishedTs;
transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatency); transportLatencies.computeIfAbsent("connectorA", key -> new ArrayList<>()).add(transportLatency);
@ -210,13 +209,13 @@ public abstract class AbstractMqttTimeseriesIntegrationTest extends AbstractMqtt
} }
} }
private void checkConnectorLatencyResult(GatewayLatencyState.ConnectorLatencyResult result, long avgGwLatency, long minGwLatency, long maxGwLatency, private void checkConnectorLatencyResult(GatewayMetricsState.ConnectorMetricsResult result, long avgGwLatency, long minGwLatency, long maxGwLatency,
long avgTransportLatency, long minTransportLatency, long maxTransportLatency) { long avgTransportLatency, long minTransportLatency, long maxTransportLatency) {
assertNotNull(result); assertNotNull(result);
assertEquals(avgGwLatency, result.avgGwLatency()); assertEquals(avgGwLatency, result.avgGwLatency());
assertEquals(minGwLatency, result.minGwLatency()); assertEquals(minGwLatency, result.minGwLatency());
assertEquals(maxGwLatency, result.maxGwLatency()); assertEquals(maxGwLatency, result.maxGwLatency());
assertEquals(avgTransportLatency, result.transportLatencyAvg()); assertEquals(avgTransportLatency, result.avgTransportLatency());
assertEquals(minTransportLatency, result.minTransportLatency()); assertEquals(minTransportLatency, result.minTransportLatency());
assertEquals(maxTransportLatency, result.maxTransportLatency()); assertEquals(maxTransportLatency, result.maxTransportLatency());
} }

View File

@ -75,7 +75,7 @@ public class MqttTopics {
public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC; public static final String GATEWAY_RPC_TOPIC = BASE_GATEWAY_API_TOPIC + RPC;
public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST; public static final String GATEWAY_ATTRIBUTES_REQUEST_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_REQUEST;
public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE; public static final String GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = BASE_GATEWAY_API_TOPIC + ATTRIBUTES_RESPONSE;
public static final String GATEWAY_LATENCY_TOPIC = BASE_GATEWAY_API_TOPIC + "/latency"; public static final String GATEWAY_METRICS_TOPIC = BASE_GATEWAY_API_TOPIC + "/metrics";
// v2 topics // v2 topics
public static final String BASE_DEVICE_API_TOPIC_V2 = "v2"; public static final String BASE_DEVICE_API_TOPIC_V2 = "v2";
public static final String REQUEST_ID_PATTERN = "(?<requestId>\\d+)"; public static final String REQUEST_ID_PATTERN = "(?<requestId>\\d+)";

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.common.transport.TransportTenantProfileCache;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor; import org.thingsboard.server.transport.mqtt.adaptors.ProtoMqttAdaptor;
import org.thingsboard.server.transport.mqtt.gateway.GatewayLatencyService; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -58,7 +58,7 @@ public class MqttTransportContext extends TransportContext {
@Getter @Getter
@Autowired @Autowired
private GatewayLatencyService gatewayLatencyService; private GatewayMetricsService gatewayMetricsService;
@Getter @Getter
@Value("${transport.mqtt.netty.max_payload_size}") @Value("${transport.mqtt.netty.max_payload_size}")

View File

@ -423,7 +423,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_DISCONNECT_TOPIC: case MqttTopics.GATEWAY_DISCONNECT_TOPIC:
gatewaySessionHandler.onDeviceDisconnect(mqttMsg); gatewaySessionHandler.onDeviceDisconnect(mqttMsg);
break; break;
case MqttTopics.GATEWAY_LATENCY_TOPIC: case MqttTopics.GATEWAY_METRICS_TOPIC:
gatewaySessionHandler.onGatewayLatency(mqttMsg); gatewaySessionHandler.onGatewayLatency(mqttMsg);
break; break;
default: default:

View File

@ -27,8 +27,8 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent; import org.thingsboard.server.transport.mqtt.TbMqttTransportComponent;
import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyData; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsData;
import org.thingsboard.server.transport.mqtt.gateway.latency.GatewayLatencyState; import org.thingsboard.server.transport.mqtt.gateway.metrics.GatewayMetricsState;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -37,10 +37,12 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@Service @Service
@TbMqttTransportComponent @TbMqttTransportComponent
public class GatewayLatencyService { public class GatewayMetricsService {
@Value("${transport.mqtt.gateway_latency_report_interval_sec:3600}") public static final String METRICS_CHECK = "metricsCheck";
private int latencyReportIntervalSec;
@Value("${transport.mqtt.gateway_metrics_report_interval_sec:3600}")
private int metricsReportIntervalSec;
@Autowired @Autowired
private SchedulerComponent scheduler; private SchedulerComponent scheduler;
@ -48,15 +50,15 @@ public class GatewayLatencyService {
@Autowired @Autowired
private TransportService transportService; private TransportService transportService;
private Map<DeviceId, GatewayLatencyState> states = new ConcurrentHashMap<>(); private Map<DeviceId, GatewayMetricsState> states = new ConcurrentHashMap<>();
@PostConstruct @PostConstruct
private void init() { private void init() {
scheduler.scheduleAtFixedRate(this::reportLatency, latencyReportIntervalSec, latencyReportIntervalSec, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(this::reportMetrics, metricsReportIntervalSec, metricsReportIntervalSec, TimeUnit.SECONDS);
} }
public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map<String, GatewayLatencyData> data, long ts) { public void process(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId, Map<String, GatewayMetricsData> data, long ts) {
states.computeIfAbsent(gatewayId, k -> new GatewayLatencyState(sessionInfo)).update(ts, data); states.computeIfAbsent(gatewayId, k -> new GatewayMetricsState(sessionInfo)).update(ts, data);
} }
public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId) { public void onDeviceUpdate(TransportProtos.SessionInfoProto sessionInfo, DeviceId gatewayId) {
@ -71,34 +73,34 @@ public class GatewayLatencyService {
} }
public void onDeviceDisconnect(DeviceId deviceId) { public void onDeviceDisconnect(DeviceId deviceId) {
GatewayLatencyState state = states.remove(deviceId); GatewayMetricsState state = states.remove(deviceId);
if (state != null) { if (state != null) {
reportLatency(state, System.currentTimeMillis()); reportMetrics(state, System.currentTimeMillis());
} }
} }
public void reportLatency() { public void reportMetrics() {
if (states.isEmpty()) { if (states.isEmpty()) {
return; return;
} }
Map<DeviceId, GatewayLatencyState> oldStates = states; Map<DeviceId, GatewayMetricsState> oldStates = states;
states = new ConcurrentHashMap<>(); states = new ConcurrentHashMap<>();
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
oldStates.forEach((gatewayId, state) -> { oldStates.forEach((gatewayId, state) -> {
reportLatency(state, ts); reportMetrics(state, ts);
}); });
oldStates.clear(); oldStates.clear();
} }
private void reportLatency(GatewayLatencyState state, long ts) { private void reportMetrics(GatewayMetricsState state, long ts) {
if (state.isEmpty()) { if (state.isEmpty()) {
return; return;
} }
var result = state.getLatencyStateResult(); var result = state.getStateResult();
var kvProto = TransportProtos.KeyValueProto.newBuilder() var kvProto = TransportProtos.KeyValueProto.newBuilder()
.setKey("latencyCheck") .setKey(METRICS_CHECK)
.setType(TransportProtos.KeyValueType.JSON_V) .setType(TransportProtos.KeyValueType.JSON_V)
.setJsonV(JacksonUtil.toString(result)) .setJsonV(JacksonUtil.toString(result))
.build(); .build();

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.transport.mqtt.gateway.latency; package org.thingsboard.server.transport.mqtt.gateway.metrics;
public record GatewayLatencyData(long receivedTs, long publishedTs) { public record GatewayMetricsData(long receivedTs, long publishedTs) {
} }

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.transport.mqtt.gateway.latency; package org.thingsboard.server.transport.mqtt.gateway.metrics;
import lombok.Getter; import lombok.Getter;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
@ -25,15 +25,15 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
public class GatewayLatencyState { public class GatewayMetricsState {
private final Map<String, ConnectorLatencyState> connectors; private final Map<String, ConnectorMetricsState> connectors;
private final Lock updateLock; private final Lock updateLock;
@Getter @Getter
private volatile TransportProtos.SessionInfoProto sessionInfo; private volatile TransportProtos.SessionInfoProto sessionInfo;
public GatewayLatencyState(TransportProtos.SessionInfoProto sessionInfo) { public GatewayMetricsState(TransportProtos.SessionInfoProto sessionInfo) {
this.connectors = new HashMap<>(); this.connectors = new HashMap<>();
this.updateLock = new ReentrantLock(); this.updateLock = new ReentrantLock();
this.sessionInfo = sessionInfo; this.sessionInfo = sessionInfo;
@ -43,19 +43,19 @@ public class GatewayLatencyState {
this.sessionInfo = sessionInfo; this.sessionInfo = sessionInfo;
} }
public void update(long ts, Map<String, GatewayLatencyData> latencyData) { public void update(long ts, Map<String, GatewayMetricsData> metricsData) {
updateLock.lock(); updateLock.lock();
try { try {
latencyData.forEach((connectorName, data) -> { metricsData.forEach((connectorName, data) -> {
connectors.computeIfAbsent(connectorName, k -> new ConnectorLatencyState()).update(ts, data); connectors.computeIfAbsent(connectorName, k -> new ConnectorMetricsState()).update(ts, data);
}); });
} finally { } finally {
updateLock.unlock(); updateLock.unlock();
} }
} }
public Map<String, ConnectorLatencyResult> getLatencyStateResult() { public Map<String, ConnectorMetricsResult> getStateResult() {
Map<String, ConnectorLatencyResult> result = new HashMap<>(); Map<String, ConnectorMetricsResult> result = new HashMap<>();
updateLock.lock(); updateLock.lock();
try { try {
connectors.forEach((name, state) -> result.put(name, state.getResult())); connectors.forEach((name, state) -> result.put(name, state.getResult()));
@ -71,7 +71,7 @@ public class GatewayLatencyState {
return connectors.isEmpty(); return connectors.isEmpty();
} }
private static class ConnectorLatencyState { private static class ConnectorMetricsState {
private final AtomicInteger count; private final AtomicInteger count;
private final AtomicLong gwLatencySum; private final AtomicLong gwLatencySum;
private final AtomicLong transportLatencySum; private final AtomicLong transportLatencySum;
@ -80,15 +80,15 @@ public class GatewayLatencyState {
private volatile long minTransportLatency; private volatile long minTransportLatency;
private volatile long maxTransportLatency; private volatile long maxTransportLatency;
private ConnectorLatencyState() { private ConnectorMetricsState() {
this.count = new AtomicInteger(0); this.count = new AtomicInteger(0);
this.gwLatencySum = new AtomicLong(0); this.gwLatencySum = new AtomicLong(0);
this.transportLatencySum = new AtomicLong(0); this.transportLatencySum = new AtomicLong(0);
} }
private void update(long serverReceiveTs, GatewayLatencyData latencyData) { private void update(long serverReceiveTs, GatewayMetricsData metricsData) {
long gwLatency = latencyData.publishedTs() - latencyData.receivedTs(); long gwLatency = metricsData.publishedTs() - metricsData.receivedTs();
long transportLatency = serverReceiveTs - latencyData.publishedTs(); long transportLatency = serverReceiveTs - metricsData.publishedTs();
count.incrementAndGet(); count.incrementAndGet();
gwLatencySum.addAndGet(gwLatency); gwLatencySum.addAndGet(gwLatency);
transportLatencySum.addAndGet(transportLatency); transportLatencySum.addAndGet(transportLatency);
@ -106,16 +106,16 @@ public class GatewayLatencyState {
} }
} }
private ConnectorLatencyResult getResult() { private ConnectorMetricsResult getResult() {
long count = this.count.get(); long count = this.count.get();
long avgGwLatency = gwLatencySum.get() / count; long avgGwLatency = gwLatencySum.get() / count;
long transportLatencyAvg = transportLatencySum.get() / count; long avgTransportLatency = transportLatencySum.get() / count;
return new ConnectorLatencyResult(avgGwLatency, minGwLatency, maxGwLatency, transportLatencyAvg, minTransportLatency, maxTransportLatency); return new ConnectorMetricsResult(avgGwLatency, minGwLatency, maxGwLatency, avgTransportLatency, minTransportLatency, maxTransportLatency);
} }
} }
public record ConnectorLatencyResult(long avgGwLatency, long minGwLatency, long maxGwLatency, public record ConnectorMetricsResult(long avgGwLatency, long minGwLatency, long maxGwLatency,
long transportLatencyAvg, long minTransportLatency, long maxTransportLatency) { long avgTransportLatency, long minTransportLatency, long maxTransportLatency) {
} }
} }

View File

@ -27,7 +27,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.mqtt.gateway.GatewayLatencyService; import org.thingsboard.server.transport.mqtt.gateway.GatewayMetricsService;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -40,11 +40,11 @@ import static com.amazonaws.util.StringUtils.UTF8;
@Slf4j @Slf4j
public class GatewaySessionHandler extends AbstractGatewaySessionHandler<GatewayDeviceSessionContext> { public class GatewaySessionHandler extends AbstractGatewaySessionHandler<GatewayDeviceSessionContext> {
private final GatewayLatencyService latencyService; private final GatewayMetricsService gatewayMetricsService;
public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) { public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) {
super(deviceSessionCtx, sessionId, overwriteDevicesActivity); super(deviceSessionCtx, sessionId, overwriteDevicesActivity);
this.latencyService = deviceSessionCtx.getContext().getGatewayLatencyService(); this.gatewayMetricsService = deviceSessionCtx.getContext().getGatewayMetricsService();
} }
public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException { public void onDeviceConnect(MqttPublishMessage mqttMsg) throws AdaptorException {
@ -72,16 +72,16 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler<Gateway
public void onGatewayDisconnect() { public void onGatewayDisconnect() {
this.onDevicesDisconnect(); this.onDevicesDisconnect();
latencyService.onDeviceDisconnect(gateway.getDeviceId()); gatewayMetricsService.onDeviceDisconnect(gateway.getDeviceId());
} }
public void onGatewayUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) { public void onGatewayUpdate(TransportProtos.SessionInfoProto sessionInfo, Device device, Optional<DeviceProfile> deviceProfileOpt) {
this.onDeviceUpdate(sessionInfo, device, deviceProfileOpt); this.onDeviceUpdate(sessionInfo, device, deviceProfileOpt);
latencyService.onDeviceUpdate(sessionInfo, gateway.getDeviceId()); gatewayMetricsService.onDeviceUpdate(sessionInfo, gateway.getDeviceId());
} }
public void onGatewayDelete(DeviceId deviceId) { public void onGatewayDelete(DeviceId deviceId) {
latencyService.onDeviceDelete(deviceId); gatewayMetricsService.onDeviceDelete(deviceId);
} }
public void onGatewayLatency(MqttPublishMessage mqttMsg) throws AdaptorException { public void onGatewayLatency(MqttPublishMessage mqttMsg) throws AdaptorException {
@ -94,7 +94,7 @@ public class GatewaySessionHandler extends AbstractGatewaySessionHandler<Gateway
} }
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
try { try {
latencyService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), JacksonUtil.fromString(payload, new TypeReference<>() {}), ts); gatewayMetricsService.process(deviceSessionCtx.getSessionInfo(), gateway.getDeviceId(), JacksonUtil.fromString(payload, new TypeReference<>() {}), ts);
ack(msgId, MqttReasonCodes.PubAck.SUCCESS); ack(msgId, MqttReasonCodes.PubAck.SUCCESS);
} catch (Throwable t) { } catch (Throwable t) {
ackOrClose(msgId); ackOrClose(msgId);

View File

@ -146,6 +146,8 @@ transport:
# MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message. # MQTT disconnect timeout in milliseconds. The time to wait for the client to disconnect after the server sends a disconnect message.
disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}" disconnect_timeout: "${MQTT_DISCONNECT_TIMEOUT:1000}"
msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism msg_queue_size_per_device_limit: "${MQTT_MSG_QUEUE_SIZE_PER_DEVICE_LIMIT:100}" # messages await in the queue before device connected state. This limit works on low level before TenantProfileLimits mechanism
# Interval of periodic report of the gateway metrics
gateway_metrics_report_interval_sec: "${MQTT_GATEWAY_METRICS_REPORT_INTERVAL_SEC:3600}"
netty: netty:
# Netty leak detector level # Netty leak detector level
leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}" leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"