Added per-edge stats (#13651)

This commit is contained in:
yevhenii_zahrebelnyi 2025-07-29 10:02:37 +03:00 committed by GitHub
parent bdd0a9a5c6
commit 320e0b674e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 530 additions and 2 deletions

View File

@ -38,6 +38,7 @@ import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.domain.DomainService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.notification.NotificationRuleService;
import org.thingsboard.server.dao.notification.NotificationTargetService;
@ -78,6 +79,7 @@ import org.thingsboard.server.service.executors.GrpcCallbackExecutorService;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Lazy
@Data
@ -198,6 +200,9 @@ public class EdgeContextComponent {
@Autowired
private WidgetsBundleService widgetsBundleService;
@Autowired
private Optional<EdgeStatsCounterService> statsCounterService;
// processors
@Autowired

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.dao.edge.stats.EdgeStatsKey;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
@ -298,6 +299,10 @@ public abstract class EdgeGrpcSession implements Closeable {
processHighPriorityEvents();
PageData<EdgeEvent> pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
if (isConnected() && !pageData.getData().isEmpty()) {
if (fetcher instanceof GeneralEdgeEventFetcher) {
long queueSize = pageData.getTotalElements() - ((long) pageLink.getPageSize() * pageLink.getPage());
ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.setDownlinkMsgsLag(edge.getTenantId(), edge.getId(), queueSize));
}
log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, edge.getId(), pageData.getData().size());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
@ -461,6 +466,8 @@ public abstract class EdgeGrpcSession implements Closeable {
ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId)
.edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build());
}
ctx.getStatsCounterService().ifPresent(statsCounterService ->
statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_TMP_FAILED, edge.getTenantId(), edge.getId(), 1));
log.warn("[{}][{}] {} on attempt {}", tenantId, edge.getId(), failureMsg, attempt);
log.debug("[{}][{}] entities in failed batch: {}", tenantId, edge.getId(), copy);
}
@ -474,6 +481,8 @@ public abstract class EdgeGrpcSession implements Closeable {
log.error("[{}][{}][{}] {} Message {}", tenantId, edge.getId(), sessionId, message, downlinkMsg);
ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId)
.edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(message).error(error).build());
ctx.getStatsCounterService().ifPresent(statsCounterService ->
statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED, edge.getTenantId(), edge.getId(), 1));
sessionState.getPendingMsgsMap().remove(downlinkMsg.getDownlinkMsgId());
} else {
sendDownlinkMsg(ResponseMsg.newBuilder()
@ -490,6 +499,7 @@ public abstract class EdgeGrpcSession implements Closeable {
ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
.customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg)
.error("Failed to deliver messages after " + MAX_DOWNLINK_ATTEMPTS + " attempts").build());
ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED, edge.getTenantId(), edge.getId(), copy.size()));
stopCurrentSendDownlinkMsgsTask(false);
}
} else {
@ -529,6 +539,7 @@ public abstract class EdgeGrpcSession implements Closeable {
try {
if (msg.getSuccess()) {
sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_PUSHED, edge.getTenantId(), edge.getId(), 1));
log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
} else {
log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
@ -649,7 +660,8 @@ public abstract class EdgeGrpcSession implements Closeable {
log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg);
}
}
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction());
}
} catch (Exception e) {
@ -795,6 +807,7 @@ public abstract class EdgeGrpcSession implements Closeable {
}
}
highPriorityQueue.add(edgeEvent);
ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_ADDED, edge.getTenantId(), edgeEvent.getEdgeId(), 1));
}
protected ListenableFuture<List<Void>> processUplinkMsg(UplinkMsg uplinkMsg) {

View File

@ -25,11 +25,14 @@ import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.edge.BaseEdgeEventService;
import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.EdgeStatsKey;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@ -40,6 +43,7 @@ public class KafkaEdgeEventService extends BaseEdgeEventService {
private final TopicService topicService;
private final TbQueueProducerProvider producerProvider;
private final Optional<EdgeStatsCounterService> statsCounterService;
@Override
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
@ -48,7 +52,7 @@ public class KafkaEdgeEventService extends BaseEdgeEventService {
TopicPartitionInfo tpi = topicService.getEdgeEventNotificationsTopic(edgeEvent.getTenantId(), edgeEvent.getEdgeId());
ToEdgeEventNotificationMsg msg = ToEdgeEventNotificationMsg.newBuilder().setEdgeEventMsg(ProtoUtils.toProto(edgeEvent)).build();
producerProvider.getTbEdgeEventsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
statsCounterService.ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_ADDED, edgeEvent.getTenantId(), edgeEvent.getEdgeId(), 1));
return Futures.immediateFuture(null);
}

View File

@ -0,0 +1,150 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.stats;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.MsgCounters;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_ADDED;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_LAG;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PUSHED;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_TMP_FAILED;
@TbCoreComponent
@ConditionalOnProperty(prefix = "edges.stats", name = "enabled", havingValue = "true", matchIfMissing = false)
@RequiredArgsConstructor
@Service
@Slf4j
public class EdgeStatsService {
private final TimeseriesService tsService;
private final EdgeStatsCounterService statsCounterService;
private final TopicService topicService;
private final Optional<TbKafkaAdmin> tbKafkaAdmin;
@Value("${edges.stats.ttl:30}")
private int edgesStatsTtlDays;
@Value("${edges.stats.report-interval-millis:600000}")
private long reportIntervalMillis;
@Scheduled(
fixedDelayString = "${edges.stats.report-interval-millis:600000}",
initialDelayString = "${edges.stats.report-interval-millis:600000}"
)
public void reportStats() {
log.debug("Reporting Edge communication stats...");
long now = System.currentTimeMillis();
long ts = now - (now % reportIntervalMillis);
Map<EdgeId, MsgCounters> countersByEdge = statsCounterService.getCounterByEdge();
Map<EdgeId, Long> lagByEdgeId = tbKafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap();
Map<EdgeId, MsgCounters> countersByEdgeSnapshot = new HashMap<>(statsCounterService.getCounterByEdge());
countersByEdgeSnapshot.forEach((edgeId, counters) -> {
TenantId tenantId = counters.getTenantId();
if (tbKafkaAdmin.isPresent()) {
counters.getMsgsLag().set(lagByEdgeId.getOrDefault(edgeId, 0L));
}
List<TsKvEntry> statsEntries = List.of(
entry(ts, DOWNLINK_MSGS_ADDED.getKey(), counters.getMsgsAdded().get()),
entry(ts, DOWNLINK_MSGS_PUSHED.getKey(), counters.getMsgsPushed().get()),
entry(ts, DOWNLINK_MSGS_PERMANENTLY_FAILED.getKey(), counters.getMsgsPermanentlyFailed().get()),
entry(ts, DOWNLINK_MSGS_TMP_FAILED.getKey(), counters.getMsgsTmpFailed().get()),
entry(ts, DOWNLINK_MSGS_LAG.getKey(), counters.getMsgsLag().get())
);
log.trace("Reported Edge communication stats: {} tenantId - {}, edgeId - {}", statsEntries, tenantId, edgeId);
saveTs(tenantId, edgeId, statsEntries);
});
}
private Map<EdgeId, Long> getEdgeLagByEdgeId(Map<EdgeId, MsgCounters> countersByEdge) {
Map<EdgeId, String> edgeToTopicMap = countersByEdge.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> topicService.buildEdgeEventNotificationsTopicPartitionInfo(e.getValue().getTenantId(), e.getKey()).getTopic()
));
Map<String, Long> lagByTopic = tbKafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values()));
return edgeToTopicMap.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> lagByTopic.getOrDefault(e.getValue(), 0L)
));
}
private void saveTs(TenantId tenantId, EdgeId edgeId, List<TsKvEntry> statsEntries) {
try {
ListenableFuture<TimeseriesSaveResult> future = tsService.save(
tenantId,
edgeId,
statsEntries,
TimeUnit.DAYS.toSeconds(edgesStatsTtlDays)
);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(TimeseriesSaveResult result) {
log.debug("Successfully saved edge time-series stats: {} for edge: {}", statsEntries, edgeId);
}
@Override
public void onFailure(Throwable t) {
log.warn("Failed to save edge time-series stats for edge: {}", edgeId, t);
}
}, MoreExecutors.directExecutor());
} finally {
statsCounterService.clear(edgeId);
}
}
private BasicTsKvEntry entry(long ts, String key, long value) {
return new BasicTsKvEntry(ts, new LongDataEntry(key, value));
}
}

View File

@ -1491,6 +1491,13 @@ edges:
state:
# Persist state of edge (active, last connect, last disconnect) into timeseries or attributes tables. 'false' means to store edge state into attributes table
persistToTelemetry: "${EDGES_PERSIST_STATE_TO_TELEMETRY:false}"
stats:
# Enable or disable reporting of edge communication stats (true or false)
enabled: "${EDGES_STATS_ENABLED:true}"
# Time-to-live in days for stored edge communication stats in timeseries
ttl: "${EDGES_STATS_TTL:30}"
# How often to report edge communication stats in milliseconds
report-interval-millis: "${EDGES_STATS_REPORT_INTERVAL_MS:600000}"
# Spring doc common parameters
springdoc:

View File

@ -0,0 +1,173 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge;
import com.google.common.util.concurrent.Futures;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.MsgCounters;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.service.edge.stats.EdgeStatsService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_ADDED;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_LAG;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PUSHED;
import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_TMP_FAILED;
@ExtendWith(MockitoExtension.class)
public class EdgeStatsTest {
@Mock
private TimeseriesService tsService;
@Mock
private TopicService topicService;
@Mock
private EdgeStatsCounterService statsCounterService;
private EdgeStatsService edgeStatsService;
private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID());
private final EdgeId edgeId = new EdgeId(UUID.randomUUID());
@BeforeEach
void setUp() {
edgeStatsService = new EdgeStatsService(
tsService,
statsCounterService,
topicService,
Optional.empty()
);
ReflectionTestUtils.setField(edgeStatsService, "edgesStatsTtlDays", 30);
ReflectionTestUtils.setField(edgeStatsService, "reportIntervalMillis", 600_000L);
}
@Test
public void testReportStatsSavesTelemetry() {
// given
MsgCounters counters = new MsgCounters(tenantId);
counters.getMsgsAdded().set(5);
counters.getMsgsPushed().set(3);
counters.getMsgsPermanentlyFailed().set(1);
counters.getMsgsTmpFailed().set(0);
counters.getMsgsLag().set(10);
ConcurrentHashMap<EdgeId, MsgCounters> countersByEdge = new ConcurrentHashMap<>();
countersByEdge.put(edgeId, counters);
when(statsCounterService.getCounterByEdge()).thenReturn(countersByEdge);
ArgumentCaptor<List<TsKvEntry>> captor = ArgumentCaptor.forClass(List.class);
when(tsService.save(eq(tenantId), eq(edgeId), captor.capture(), anyLong()))
.thenReturn(Futures.immediateFuture(mock(TimeseriesSaveResult.class)));
// when
edgeStatsService.reportStats();
// then
List<TsKvEntry> entries = captor.getValue();
Assertions.assertEquals(5, entries.size());
Map<String, Long> valuesByKey = entries.stream()
.collect(Collectors.toMap(TsKvEntry::getKey, e -> e.getLongValue().orElse(-1L)));
Assertions.assertEquals(5L, valuesByKey.get(DOWNLINK_MSGS_ADDED.getKey()).longValue());
Assertions.assertEquals(3L, valuesByKey.get(DOWNLINK_MSGS_PUSHED.getKey()).longValue());
Assertions.assertEquals(1L, valuesByKey.get(DOWNLINK_MSGS_PERMANENTLY_FAILED.getKey()).longValue());
Assertions.assertEquals(0L, valuesByKey.get(DOWNLINK_MSGS_TMP_FAILED.getKey()).longValue());
Assertions.assertEquals(10L, valuesByKey.get(DOWNLINK_MSGS_LAG.getKey()).longValue());
verify(statsCounterService).clear(edgeId);
}
@Test
public void testReportStatsWithKafkaLag() {
// given
MsgCounters counters = new MsgCounters(tenantId);
counters.getMsgsAdded().set(2);
counters.getMsgsPushed().set(2);
counters.getMsgsPermanentlyFailed().set(0);
counters.getMsgsTmpFailed().set(1);
counters.getMsgsLag().set(0);
ConcurrentHashMap<EdgeId, MsgCounters> countersByEdge = new ConcurrentHashMap<>();
countersByEdge.put(edgeId, counters);
// mocks
when(statsCounterService.getCounterByEdge()).thenReturn(countersByEdge);
String topic = "edge-topic";
TopicPartitionInfo partitionInfo = new TopicPartitionInfo(topic, tenantId, 0, false);
when(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId)).thenReturn(partitionInfo);
TbKafkaAdmin kafkaAdmin = mock(TbKafkaAdmin.class);
when(kafkaAdmin.getTotalLagForGroupsBulk(Set.of(topic)))
.thenReturn(Map.of(topic, 15L));
ArgumentCaptor<List<TsKvEntry>> captor = ArgumentCaptor.forClass(List.class);
when(tsService.save(eq(tenantId), eq(edgeId), captor.capture(), anyLong()))
.thenReturn(Futures.immediateFuture(mock(TimeseriesSaveResult.class)));
edgeStatsService = new EdgeStatsService(
tsService,
statsCounterService,
topicService,
Optional.of(kafkaAdmin)
);
ReflectionTestUtils.setField(edgeStatsService, "edgesStatsTtlDays", 30);
ReflectionTestUtils.setField(edgeStatsService, "reportIntervalMillis", 600_000L);
// when
edgeStatsService.reportStats();
// then
List<TsKvEntry> entries = captor.getValue();
Map<String, Long> valuesByKey = entries.stream()
.collect(Collectors.toMap(TsKvEntry::getKey, e -> e.getLongValue().orElse(-1L)));
Assertions.assertEquals(15L, valuesByKey.get(DOWNLINK_MSGS_LAG.getKey()));
verify(statsCounterService).clear(edgeId);
}
}

View File

@ -31,8 +31,10 @@ import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@ -260,4 +262,41 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin {
}
}
public Map<String, Long> getTotalLagForGroupsBulk(Set<String> groupIds) {
Map<String, Long> result = new HashMap<>();
for (String groupId : groupIds) {
result.put(groupId, getTotalConsumerGroupLag(groupId));
}
return result;
}
public long getTotalConsumerGroupLag(String groupId) {
try {
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getConsumerGroupOffsets(groupId);
if (committedOffsets.isEmpty()) {
return 0L;
}
Map<TopicPartition, OffsetSpec> latestOffsetsSpec = committedOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
settings.getAdminClient().listOffsets(latestOffsetsSpec)
.all().get(10, TimeUnit.SECONDS);
return committedOffsets.entrySet().stream()
.mapToLong(entry -> {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
long end = endOffsets.getOrDefault(tp,
new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset();
return end - committed;
}).sum();
} catch (Exception e) {
log.error("Failed to get total lag for consumer group: {}", groupId, e);
return 0L;
}
}
}

View File

@ -28,8 +28,11 @@ import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.EdgeStatsKey;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -41,6 +44,7 @@ public class PostgresEdgeEventService extends BaseEdgeEventService {
private final EdgeEventDao edgeEventDao;
private final ApplicationEventPublisher eventPublisher;
private final Optional<EdgeStatsCounterService> statsCounterService;
private ExecutorService edgeEventExecutor;
@ -64,6 +68,7 @@ public class PostgresEdgeEventService extends BaseEdgeEventService {
Futures.addCallback(saveFuture, new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
statsCounterService.ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_ADDED, edgeEvent.getTenantId(), edgeEvent.getEdgeId(), 1));
eventPublisher.publishEvent(SaveEntityEvent.builder()
.tenantId(edgeEvent.getTenantId())
.entityId(edgeEvent.getEdgeId())

View File

@ -0,0 +1,57 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.edge.stats;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.concurrent.ConcurrentHashMap;
@ConditionalOnProperty(prefix = "edges.stats", name = "enabled", havingValue = "true", matchIfMissing = false)
@Service
@Slf4j
@Getter
public class EdgeStatsCounterService {
private final ConcurrentHashMap<EdgeId, MsgCounters> counterByEdge = new ConcurrentHashMap<>();
public void recordEvent(EdgeStatsKey type, TenantId tenantId, EdgeId edgeId, long value) {
MsgCounters counters = getOrCreateCounters(tenantId, edgeId);
switch (type) {
case DOWNLINK_MSGS_ADDED -> counters.getMsgsAdded().addAndGet(value);
case DOWNLINK_MSGS_PUSHED -> counters.getMsgsPushed().addAndGet(value);
case DOWNLINK_MSGS_PERMANENTLY_FAILED -> counters.getMsgsPermanentlyFailed().addAndGet(value);
case DOWNLINK_MSGS_TMP_FAILED -> counters.getMsgsTmpFailed().addAndGet(value);
}
}
public void setDownlinkMsgsLag(TenantId tenantId, EdgeId edgeId, long value) {
getOrCreateCounters(tenantId, edgeId).getMsgsLag().set(value);
}
public void clear(EdgeId edgeId) {
counterByEdge.remove(edgeId);
}
public MsgCounters getOrCreateCounters(TenantId tenantId, EdgeId edgeId) {
return counterByEdge.computeIfAbsent(edgeId, id -> new MsgCounters(tenantId));
}
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.edge.stats;
import lombok.Getter;
@Getter
public enum EdgeStatsKey {
DOWNLINK_MSGS_ADDED("downlinkMsgsAdded"),
DOWNLINK_MSGS_PUSHED("downlinkMsgsPushed"),
DOWNLINK_MSGS_PERMANENTLY_FAILED("downlinkMsgsPermanentlyFailed"),
DOWNLINK_MSGS_TMP_FAILED("downlinkMsgsTmpFailed"),
DOWNLINK_MSGS_LAG("downlinkMsgsLag");
private final String key;
EdgeStatsKey(String key) {
this.key = key;
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.edge.stats;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.concurrent.atomic.AtomicLong;
@Data
public class MsgCounters {
private final TenantId tenantId;
private final AtomicLong msgsAdded = new AtomicLong();
private final AtomicLong msgsPushed = new AtomicLong();
private final AtomicLong msgsPermanentlyFailed = new AtomicLong();
private final AtomicLong msgsTmpFailed = new AtomicLong();
private final AtomicLong msgsLag = new AtomicLong();
public void clear() {
msgsAdded.set(0);
msgsPushed.set(0);
msgsPermanentlyFailed.set(0);
msgsTmpFailed.set(0);
msgsLag.set(0);
}
}