diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index f5033cce80..c143214e2b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -270,10 +270,19 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso rpc.setExpirationTime(request.getExpirationTime()); rpc.setRequest(JacksonUtil.valueToTree(request)); rpc.setStatus(status); - rpc.setAdditionalInfo(JacksonUtil.toJsonNode(request.getAdditionalInfo())); + rpc.setAdditionalInfo(getAdditionalInfo(request)); systemContext.getTbRpcService().save(tenantId, rpc); } + private JsonNode getAdditionalInfo(ToDeviceRpcRequest request) { + try { + return JacksonUtil.toJsonNode(request.getAdditionalInfo()); + } catch (IllegalArgumentException e) { + log.debug("Failed to parse additional info [{}]", request.getAdditionalInfo()); + return JacksonUtil.valueToTree(request.getAdditionalInfo()); + } + } + private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { ToDeviceRpcRequestBody body = request.getBody(); return ToDeviceRpcRequestMsg.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index 1193f935a0..5c5eea32fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -38,6 +38,7 @@ import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.domain.DomainService; import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.edge.EdgeService; +import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.notification.NotificationRuleService; import org.thingsboard.server.dao.notification.NotificationTargetService; @@ -78,6 +79,7 @@ import org.thingsboard.server.service.executors.GrpcCallbackExecutorService; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Optional; @Lazy @Data @@ -198,6 +200,9 @@ public class EdgeContextComponent { @Autowired private WidgetsBundleService widgetsBundleService; + @Autowired + private Optional statsCounterService; + // processors @Autowired diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index be65dd13a1..521730741f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.dao.edge.stats.EdgeStatsKey; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; @@ -298,6 +299,10 @@ public abstract class EdgeGrpcSession implements Closeable { processHighPriorityEvents(); PageData pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink); if (isConnected() && !pageData.getData().isEmpty()) { + if (fetcher instanceof GeneralEdgeEventFetcher) { + long queueSize = pageData.getTotalElements() - ((long) pageLink.getPageSize() * pageLink.getPage()); + ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.setDownlinkMsgsLag(edge.getTenantId(), edge.getId(), queueSize)); + } log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, edge.getId(), pageData.getData().size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { @@ -461,6 +466,8 @@ public abstract class EdgeGrpcSession implements Closeable { ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId) .edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build()); } + ctx.getStatsCounterService().ifPresent(statsCounterService -> + statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_TMP_FAILED, edge.getTenantId(), edge.getId(), 1)); log.warn("[{}][{}] {} on attempt {}", tenantId, edge.getId(), failureMsg, attempt); log.debug("[{}][{}] entities in failed batch: {}", tenantId, edge.getId(), copy); } @@ -474,6 +481,8 @@ public abstract class EdgeGrpcSession implements Closeable { log.error("[{}][{}][{}] {} Message {}", tenantId, edge.getId(), sessionId, message, downlinkMsg); ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId) .edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(message).error(error).build()); + ctx.getStatsCounterService().ifPresent(statsCounterService -> + statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED, edge.getTenantId(), edge.getId(), 1)); sessionState.getPendingMsgsMap().remove(downlinkMsg.getDownlinkMsgId()); } else { sendDownlinkMsg(ResponseMsg.newBuilder() @@ -490,6 +499,7 @@ public abstract class EdgeGrpcSession implements Closeable { ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg) .error("Failed to deliver messages after " + MAX_DOWNLINK_ATTEMPTS + " attempts").build()); + ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED, edge.getTenantId(), edge.getId(), copy.size())); stopCurrentSendDownlinkMsgsTask(false); } } else { @@ -529,6 +539,7 @@ public abstract class EdgeGrpcSession implements Closeable { try { if (msg.getSuccess()) { sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId()); + ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_PUSHED, edge.getTenantId(), edge.getId(), 1)); log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg); } else { log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg()); @@ -649,7 +660,8 @@ public abstract class EdgeGrpcSession implements Closeable { log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg); } } - case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); + case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> + downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction()); } } catch (Exception e) { @@ -795,6 +807,7 @@ public abstract class EdgeGrpcSession implements Closeable { } } highPriorityQueue.add(edgeEvent); + ctx.getStatsCounterService().ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_ADDED, edge.getTenantId(), edgeEvent.getEdgeId(), 1)); } protected ListenableFuture> processUplinkMsg(UplinkMsg uplinkMsg) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java index 8c3d897c1e..bc00ef4481 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java @@ -25,11 +25,14 @@ import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.edge.BaseEdgeEventService; +import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; +import org.thingsboard.server.dao.edge.stats.EdgeStatsKey; import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; +import java.util.Optional; import java.util.UUID; @Slf4j @@ -40,6 +43,7 @@ public class KafkaEdgeEventService extends BaseEdgeEventService { private final TopicService topicService; private final TbQueueProducerProvider producerProvider; + private final Optional statsCounterService; @Override public ListenableFuture saveAsync(EdgeEvent edgeEvent) { @@ -48,7 +52,7 @@ public class KafkaEdgeEventService extends BaseEdgeEventService { TopicPartitionInfo tpi = topicService.getEdgeEventNotificationsTopic(edgeEvent.getTenantId(), edgeEvent.getEdgeId()); ToEdgeEventNotificationMsg msg = ToEdgeEventNotificationMsg.newBuilder().setEdgeEventMsg(ProtoUtils.toProto(edgeEvent)).build(); producerProvider.getTbEdgeEventsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); - + statsCounterService.ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_ADDED, edgeEvent.getTenantId(), edgeEvent.getEdgeId(), 1)); return Futures.immediateFuture(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java b/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java new file mode 100644 index 0000000000..3fc391ec72 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java @@ -0,0 +1,150 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.stats; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; +import org.thingsboard.server.dao.edge.stats.MsgCounters; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.queue.discovery.TopicService; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_ADDED; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_LAG; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PUSHED; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_TMP_FAILED; + +@TbCoreComponent +@ConditionalOnProperty(prefix = "edges.stats", name = "enabled", havingValue = "true", matchIfMissing = false) +@RequiredArgsConstructor +@Service +@Slf4j +public class EdgeStatsService { + + private final TimeseriesService tsService; + private final EdgeStatsCounterService statsCounterService; + private final TopicService topicService; + private final Optional tbKafkaAdmin; + + @Value("${edges.stats.ttl:30}") + private int edgesStatsTtlDays; + @Value("${edges.stats.report-interval-millis:600000}") + private long reportIntervalMillis; + + + @Scheduled( + fixedDelayString = "${edges.stats.report-interval-millis:600000}", + initialDelayString = "${edges.stats.report-interval-millis:600000}" + ) + public void reportStats() { + log.debug("Reporting Edge communication stats..."); + long now = System.currentTimeMillis(); + long ts = now - (now % reportIntervalMillis); + + Map countersByEdge = statsCounterService.getCounterByEdge(); + Map lagByEdgeId = tbKafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap(); + Map countersByEdgeSnapshot = new HashMap<>(statsCounterService.getCounterByEdge()); + countersByEdgeSnapshot.forEach((edgeId, counters) -> { + TenantId tenantId = counters.getTenantId(); + + if (tbKafkaAdmin.isPresent()) { + counters.getMsgsLag().set(lagByEdgeId.getOrDefault(edgeId, 0L)); + } + List statsEntries = List.of( + entry(ts, DOWNLINK_MSGS_ADDED.getKey(), counters.getMsgsAdded().get()), + entry(ts, DOWNLINK_MSGS_PUSHED.getKey(), counters.getMsgsPushed().get()), + entry(ts, DOWNLINK_MSGS_PERMANENTLY_FAILED.getKey(), counters.getMsgsPermanentlyFailed().get()), + entry(ts, DOWNLINK_MSGS_TMP_FAILED.getKey(), counters.getMsgsTmpFailed().get()), + entry(ts, DOWNLINK_MSGS_LAG.getKey(), counters.getMsgsLag().get()) + ); + + log.trace("Reported Edge communication stats: {} tenantId - {}, edgeId - {}", statsEntries, tenantId, edgeId); + saveTs(tenantId, edgeId, statsEntries); + }); + } + + private Map getEdgeLagByEdgeId(Map countersByEdge) { + Map edgeToTopicMap = countersByEdge.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> topicService.buildEdgeEventNotificationsTopicPartitionInfo(e.getValue().getTenantId(), e.getKey()).getTopic() + )); + + Map lagByTopic = tbKafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values())); + + return edgeToTopicMap.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> lagByTopic.getOrDefault(e.getValue(), 0L) + )); + } + + private void saveTs(TenantId tenantId, EdgeId edgeId, List statsEntries) { + try { + ListenableFuture future = tsService.save( + tenantId, + edgeId, + statsEntries, + TimeUnit.DAYS.toSeconds(edgesStatsTtlDays) + ); + + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(TimeseriesSaveResult result) { + log.debug("Successfully saved edge time-series stats: {} for edge: {}", statsEntries, edgeId); + } + + @Override + public void onFailure(Throwable t) { + log.warn("Failed to save edge time-series stats for edge: {}", edgeId, t); + } + }, MoreExecutors.directExecutor()); + } finally { + statsCounterService.clear(edgeId); + } + } + + private BasicTsKvEntry entry(long ts, String key, long value) { + return new BasicTsKvEntry(ts, new LongDataEntry(key, value)); + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 87163e85b3..c54553fff8 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1491,6 +1491,13 @@ edges: state: # Persist state of edge (active, last connect, last disconnect) into timeseries or attributes tables. 'false' means to store edge state into attributes table persistToTelemetry: "${EDGES_PERSIST_STATE_TO_TELEMETRY:false}" + stats: + # Enable or disable reporting of edge communication stats (true or false) + enabled: "${EDGES_STATS_ENABLED:true}" + # Time-to-live in days for stored edge communication stats in timeseries + ttl: "${EDGES_STATS_TTL:30}" + # How often to report edge communication stats in milliseconds + report-interval-millis: "${EDGES_STATS_REPORT_INTERVAL_MS:600000}" # Spring doc common parameters springdoc: diff --git a/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java b/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java new file mode 100644 index 0000000000..933318ae59 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java @@ -0,0 +1,173 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge; + +import com.google.common.util.concurrent.Futures; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; +import org.thingsboard.server.dao.edge.stats.MsgCounters; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.queue.discovery.TopicService; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.service.edge.stats.EdgeStatsService; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_ADDED; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_LAG; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PERMANENTLY_FAILED; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_PUSHED; +import static org.thingsboard.server.dao.edge.stats.EdgeStatsKey.DOWNLINK_MSGS_TMP_FAILED; + +@ExtendWith(MockitoExtension.class) +public class EdgeStatsTest { + + @Mock + private TimeseriesService tsService; + @Mock + private TopicService topicService; + @Mock + private EdgeStatsCounterService statsCounterService; + private EdgeStatsService edgeStatsService; + + private final TenantId tenantId = TenantId.fromUUID(UUID.randomUUID()); + private final EdgeId edgeId = new EdgeId(UUID.randomUUID()); + + @BeforeEach + void setUp() { + edgeStatsService = new EdgeStatsService( + tsService, + statsCounterService, + topicService, + Optional.empty() + ); + + ReflectionTestUtils.setField(edgeStatsService, "edgesStatsTtlDays", 30); + ReflectionTestUtils.setField(edgeStatsService, "reportIntervalMillis", 600_000L); + } + + @Test + public void testReportStatsSavesTelemetry() { + // given + MsgCounters counters = new MsgCounters(tenantId); + counters.getMsgsAdded().set(5); + counters.getMsgsPushed().set(3); + counters.getMsgsPermanentlyFailed().set(1); + counters.getMsgsTmpFailed().set(0); + counters.getMsgsLag().set(10); + + ConcurrentHashMap countersByEdge = new ConcurrentHashMap<>(); + countersByEdge.put(edgeId, counters); + + when(statsCounterService.getCounterByEdge()).thenReturn(countersByEdge); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + when(tsService.save(eq(tenantId), eq(edgeId), captor.capture(), anyLong())) + .thenReturn(Futures.immediateFuture(mock(TimeseriesSaveResult.class))); + + // when + edgeStatsService.reportStats(); + + // then + List entries = captor.getValue(); + Assertions.assertEquals(5, entries.size()); + + Map valuesByKey = entries.stream() + .collect(Collectors.toMap(TsKvEntry::getKey, e -> e.getLongValue().orElse(-1L))); + + Assertions.assertEquals(5L, valuesByKey.get(DOWNLINK_MSGS_ADDED.getKey()).longValue()); + Assertions.assertEquals(3L, valuesByKey.get(DOWNLINK_MSGS_PUSHED.getKey()).longValue()); + Assertions.assertEquals(1L, valuesByKey.get(DOWNLINK_MSGS_PERMANENTLY_FAILED.getKey()).longValue()); + Assertions.assertEquals(0L, valuesByKey.get(DOWNLINK_MSGS_TMP_FAILED.getKey()).longValue()); + Assertions.assertEquals(10L, valuesByKey.get(DOWNLINK_MSGS_LAG.getKey()).longValue()); + + + verify(statsCounterService).clear(edgeId); + } + + @Test + public void testReportStatsWithKafkaLag() { + // given + MsgCounters counters = new MsgCounters(tenantId); + counters.getMsgsAdded().set(2); + counters.getMsgsPushed().set(2); + counters.getMsgsPermanentlyFailed().set(0); + counters.getMsgsTmpFailed().set(1); + counters.getMsgsLag().set(0); + + ConcurrentHashMap countersByEdge = new ConcurrentHashMap<>(); + countersByEdge.put(edgeId, counters); + + // mocks + when(statsCounterService.getCounterByEdge()).thenReturn(countersByEdge); + + String topic = "edge-topic"; + TopicPartitionInfo partitionInfo = new TopicPartitionInfo(topic, tenantId, 0, false); + when(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId)).thenReturn(partitionInfo); + + TbKafkaAdmin kafkaAdmin = mock(TbKafkaAdmin.class); + when(kafkaAdmin.getTotalLagForGroupsBulk(Set.of(topic))) + .thenReturn(Map.of(topic, 15L)); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + when(tsService.save(eq(tenantId), eq(edgeId), captor.capture(), anyLong())) + .thenReturn(Futures.immediateFuture(mock(TimeseriesSaveResult.class))); + + edgeStatsService = new EdgeStatsService( + tsService, + statsCounterService, + topicService, + Optional.of(kafkaAdmin) + ); + ReflectionTestUtils.setField(edgeStatsService, "edgesStatsTtlDays", 30); + ReflectionTestUtils.setField(edgeStatsService, "reportIntervalMillis", 600_000L); + + // when + edgeStatsService.reportStats(); + + // then + List entries = captor.getValue(); + Map valuesByKey = entries.stream() + .collect(Collectors.toMap(TsKvEntry::getKey, e -> e.getLongValue().orElse(-1L))); + + Assertions.assertEquals(15L, valuesByKey.get(DOWNLINK_MSGS_LAG.getKey())); + verify(statsCounterService).clear(edgeId); + } + +} diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/StringDataPoint.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/StringDataPoint.java index 52205e2f72..6c67b467eb 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/StringDataPoint.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/data/dp/StringDataPoint.java @@ -33,6 +33,16 @@ public class StringDataPoint extends AbstractDataPoint { this.value = deduplicate ? TbStringPool.intern(value) : value; } + @Override + public double getDouble() { + return Double.parseDouble(value); + } + + @Override + public long getLong() { + return Long.parseLong(value); + } + @Override public DataType getType() { return DataType.STRING; diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 6d010a7a0d..be407bd3db 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -529,8 +529,10 @@ public class ProtoUtils { .setRequestIdMSB(msg.getMsg().getId().getMostSignificantBits()) .setRequestIdLSB(msg.getMsg().getId().getLeastSignificantBits()) .setOneway(msg.getMsg().isOneway()) - .setPersisted(msg.getMsg().isPersisted()) - .setAdditionalInfo(msg.getMsg().getAdditionalInfo()); + .setPersisted(msg.getMsg().isPersisted()); + if (msg.getMsg().getAdditionalInfo() != null) { + builder.setAdditionalInfo(msg.getMsg().getAdditionalInfo()); + } if (msg.getMsg().getRetries() != null) { builder.setRetries(msg.getMsg().getRetries()); } @@ -555,7 +557,9 @@ public class ProtoUtils { toDeviceRpcRequestMsg.getOneway(), toDeviceRpcRequestMsg.getExpirationTime(), new ToDeviceRpcRequestBody(toDeviceRpcRequestMsg.getMethodName(), toDeviceRpcRequestMsg.getParams()), - toDeviceRpcRequestMsg.getPersisted(), toDeviceRpcRequestMsg.hasRetries() ? toDeviceRpcRequestMsg.getRetries() : null, toDeviceRpcRequestMsg.getAdditionalInfo()); + toDeviceRpcRequestMsg.getPersisted(), + toDeviceRpcRequestMsg.hasRetries() ? toDeviceRpcRequestMsg.getRetries() : null, + toDeviceRpcRequestMsg.hasAdditionalInfo() ? toDeviceRpcRequestMsg.getAdditionalInfo() : null); return new ToDeviceRpcRequestActorMsg(proto.getServiceId(), toDeviceRpcRequest); } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 3870f17c5b..8372753df4 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -699,7 +699,7 @@ message ToDeviceRpcRequestMsg { bool oneway = 7; bool persisted = 8; optional int32 retries = 9; - string additionalInfo = 10; + optional string additionalInfo = 10; } message ToDeviceRpcResponseMsg { diff --git a/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java index 25788721dc..a46e489268 100644 --- a/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java +++ b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java @@ -21,6 +21,9 @@ import org.jeasy.random.EasyRandomParameters; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullAndEmptySource; +import org.junit.jupiter.params.provider.ValueSource; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.Device; @@ -56,6 +59,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.sync.vc.RepositorySettings; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration; +import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeHighPriorityMsg; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; @@ -279,4 +283,65 @@ class ProtoUtilsTest { assertThat(actual).as(String.format(description, entityName, entityName)).isEqualTo(expected); } + @ParameterizedTest + @NullAndEmptySource + @ValueSource(strings = {"{\"key\":\"value\"}"}) + void testRpcWithVariousAdditionalInfoToProtoAndBack(String additionalInfo) { + UUID requestId = UUID.fromString("93405c57-5787-46ff-806e-670bb60f49b6"); + String methodName = "reboot"; + String params = ""; + String serviceId = "serviceId"; + long expirationTime = System.currentTimeMillis(); + int retries = 3; + + ToDeviceRpcRequest request = new ToDeviceRpcRequest( + requestId, + tenantId, + deviceId, + false, + expirationTime, + new ToDeviceRpcRequestBody(methodName, params), + true, + retries, + additionalInfo + ); + ToDeviceRpcRequestActorMsg msg = new ToDeviceRpcRequestActorMsg(serviceId, request); + + // Serialize + TransportProtos.ToDeviceActorNotificationMsgProto toProto = ProtoUtils.toProto(msg); + assertThat(toProto).isNotNull(); + assertThat(toProto.hasToDeviceRpcRequestMsg()).isTrue(); + + TransportProtos.ToDeviceRpcRequestActorMsgProto toDeviceRpcRequestActorMsgProto = toProto.getToDeviceRpcRequestMsg(); + assertThat(toDeviceRpcRequestActorMsgProto.hasToDeviceRpcRequestMsg()).isTrue(); + + TransportProtos.ToDeviceRpcRequestMsg toDeviceRpcRequestMsg = toDeviceRpcRequestActorMsgProto.getToDeviceRpcRequestMsg(); + assertThat(toDeviceRpcRequestMsg.getRequestIdMSB()).isEqualTo(requestId.getMostSignificantBits()); + assertThat(toDeviceRpcRequestMsg.getRequestIdLSB()).isEqualTo(requestId.getLeastSignificantBits()); + assertThat(toDeviceRpcRequestMsg.getMethodName()).isEqualTo(methodName); + assertThat(toDeviceRpcRequestMsg.getParams()).isEqualTo(params); + assertThat(toDeviceRpcRequestMsg.getExpirationTime()).isEqualTo(expirationTime); + assertThat(toDeviceRpcRequestMsg.getOneway()).isFalse(); + assertThat(toDeviceRpcRequestMsg.getPersisted()).isTrue(); + assertThat(toDeviceRpcRequestMsg.getRetries()).isEqualTo(retries); + + if (additionalInfo != null) { + assertThat(toDeviceRpcRequestMsg.hasAdditionalInfo()).isTrue(); + assertThat(toDeviceRpcRequestMsg.getAdditionalInfo()).isEqualTo(additionalInfo); + } else { + assertThat(toDeviceRpcRequestMsg.hasAdditionalInfo()).isFalse(); + } + + // Deserialize + ToDeviceActorNotificationMsg fromProto = ProtoUtils.fromProto(toProto); + assertThat(fromProto).isNotNull(); + assertThat(fromProto).isInstanceOf(ToDeviceRpcRequestActorMsg.class); + ToDeviceRpcRequestActorMsg toDeviceRpcRequestActorMsg = (ToDeviceRpcRequestActorMsg) fromProto; + + assertThat(toDeviceRpcRequestActorMsg.getDeviceId()).isEqualTo(deviceId); + assertThat(toDeviceRpcRequestActorMsg.getTenantId()).isEqualTo(tenantId); + assertThat(toDeviceRpcRequestActorMsg.getServiceId()).isEqualTo(serviceId); + assertThat(toDeviceRpcRequestActorMsg.getMsg()).isEqualTo(request); + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 1e0064a5c8..47a7ef13e3 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -31,8 +31,10 @@ import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -260,4 +262,41 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin { } } + public Map getTotalLagForGroupsBulk(Set groupIds) { + Map result = new HashMap<>(); + for (String groupId : groupIds) { + result.put(groupId, getTotalConsumerGroupLag(groupId)); + } + return result; + } + + public long getTotalConsumerGroupLag(String groupId) { + try { + Map committedOffsets = getConsumerGroupOffsets(groupId); + if (committedOffsets.isEmpty()) { + return 0L; + } + + Map latestOffsetsSpec = committedOffsets.keySet().stream() + .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); + + Map endOffsets = + settings.getAdminClient().listOffsets(latestOffsetsSpec) + .all().get(10, TimeUnit.SECONDS); + + return committedOffsets.entrySet().stream() + .mapToLong(entry -> { + TopicPartition tp = entry.getKey(); + long committed = entry.getValue().offset(); + long end = endOffsets.getOrDefault(tp, + new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset(); + return end - committed; + }).sum(); + + } catch (Exception e) { + log.error("Failed to get total lag for consumer group: {}", groupId, e); + return 0L; + } + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/PostgresEdgeEventService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/PostgresEdgeEventService.java index 90939858bc..ecf1376a73 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/PostgresEdgeEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/PostgresEdgeEventService.java @@ -28,8 +28,11 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; +import org.thingsboard.server.dao.edge.stats.EdgeStatsKey; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,6 +44,7 @@ public class PostgresEdgeEventService extends BaseEdgeEventService { private final EdgeEventDao edgeEventDao; private final ApplicationEventPublisher eventPublisher; + private final Optional statsCounterService; private ExecutorService edgeEventExecutor; @@ -64,6 +68,7 @@ public class PostgresEdgeEventService extends BaseEdgeEventService { Futures.addCallback(saveFuture, new FutureCallback<>() { @Override public void onSuccess(Void result) { + statsCounterService.ifPresent(statsCounterService -> statsCounterService.recordEvent(EdgeStatsKey.DOWNLINK_MSGS_ADDED, edgeEvent.getTenantId(), edgeEvent.getEdgeId(), 1)); eventPublisher.publishEvent(SaveEntityEvent.builder() .tenantId(edgeEvent.getTenantId()) .entityId(edgeEvent.getEdgeId()) diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/stats/EdgeStatsCounterService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/stats/EdgeStatsCounterService.java new file mode 100644 index 0000000000..16111cf514 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/stats/EdgeStatsCounterService.java @@ -0,0 +1,57 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.edge.stats; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.concurrent.ConcurrentHashMap; + +@ConditionalOnProperty(prefix = "edges.stats", name = "enabled", havingValue = "true", matchIfMissing = false) +@Service +@Slf4j +@Getter +public class EdgeStatsCounterService { + + private final ConcurrentHashMap counterByEdge = new ConcurrentHashMap<>(); + + public void recordEvent(EdgeStatsKey type, TenantId tenantId, EdgeId edgeId, long value) { + MsgCounters counters = getOrCreateCounters(tenantId, edgeId); + switch (type) { + case DOWNLINK_MSGS_ADDED -> counters.getMsgsAdded().addAndGet(value); + case DOWNLINK_MSGS_PUSHED -> counters.getMsgsPushed().addAndGet(value); + case DOWNLINK_MSGS_PERMANENTLY_FAILED -> counters.getMsgsPermanentlyFailed().addAndGet(value); + case DOWNLINK_MSGS_TMP_FAILED -> counters.getMsgsTmpFailed().addAndGet(value); + } + } + + public void setDownlinkMsgsLag(TenantId tenantId, EdgeId edgeId, long value) { + getOrCreateCounters(tenantId, edgeId).getMsgsLag().set(value); + } + + public void clear(EdgeId edgeId) { + counterByEdge.remove(edgeId); + } + + public MsgCounters getOrCreateCounters(TenantId tenantId, EdgeId edgeId) { + return counterByEdge.computeIfAbsent(edgeId, id -> new MsgCounters(tenantId)); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/stats/EdgeStatsKey.java b/dao/src/main/java/org/thingsboard/server/dao/edge/stats/EdgeStatsKey.java new file mode 100644 index 0000000000..e017f6969e --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/stats/EdgeStatsKey.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.edge.stats; + +import lombok.Getter; + +@Getter +public enum EdgeStatsKey { + DOWNLINK_MSGS_ADDED("downlinkMsgsAdded"), + DOWNLINK_MSGS_PUSHED("downlinkMsgsPushed"), + DOWNLINK_MSGS_PERMANENTLY_FAILED("downlinkMsgsPermanentlyFailed"), + DOWNLINK_MSGS_TMP_FAILED("downlinkMsgsTmpFailed"), + DOWNLINK_MSGS_LAG("downlinkMsgsLag"); + + private final String key; + + EdgeStatsKey(String key) { + this.key = key; + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/stats/MsgCounters.java b/dao/src/main/java/org/thingsboard/server/dao/edge/stats/MsgCounters.java new file mode 100644 index 0000000000..7596da0ecf --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/stats/MsgCounters.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.edge.stats; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.concurrent.atomic.AtomicLong; + +@Data +public class MsgCounters { + + private final TenantId tenantId; + private final AtomicLong msgsAdded = new AtomicLong(); + private final AtomicLong msgsPushed = new AtomicLong(); + private final AtomicLong msgsPermanentlyFailed = new AtomicLong(); + private final AtomicLong msgsTmpFailed = new AtomicLong(); + private final AtomicLong msgsLag = new AtomicLong(); + + public void clear() { + msgsAdded.set(0); + msgsPushed.set(0); + msgsPermanentlyFailed.set(0); + msgsTmpFailed.set(0); + msgsLag.set(0); + } + +} diff --git a/edqs/src/test/java/org/thingsboard/server/edqs/repo/EntityTypeFilterTest.java b/edqs/src/test/java/org/thingsboard/server/edqs/repo/EntityTypeFilterTest.java index 8fab142dc7..e7fc4419ef 100644 --- a/edqs/src/test/java/org/thingsboard/server/edqs/repo/EntityTypeFilterTest.java +++ b/edqs/src/test/java/org/thingsboard/server/edqs/repo/EntityTypeFilterTest.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.query.EntityDataPageLink; import org.thingsboard.server.common.data.query.EntityDataQuery; @@ -36,7 +37,9 @@ import org.thingsboard.server.common.data.query.EntityKeyValueType; import org.thingsboard.server.common.data.query.EntityTypeFilter; import org.thingsboard.server.common.data.query.FilterPredicateValue; import org.thingsboard.server.common.data.query.KeyFilter; +import org.thingsboard.server.common.data.query.NumericFilterPredicate; import org.thingsboard.server.common.data.query.StringFilterPredicate; +import org.thingsboard.server.edqs.util.RepositoryUtils; import java.util.Arrays; import java.util.List; @@ -61,6 +64,10 @@ public class EntityTypeFilterTest extends AbstractEDQTest { addOrUpdate(new LatestTsKv(device.getId(), new BasicTsKvEntry(43, new StringDataEntry("state", "enabled")), 0L)); addOrUpdate(new LatestTsKv(device2.getId(), new BasicTsKvEntry(43, new StringDataEntry("state", "disabled")), 0L)); addOrUpdate(new LatestTsKv(device3.getId(), new BasicTsKvEntry(43, new BooleanDataEntry("free", true)), 0L)); + + addOrUpdate(new LatestTsKv(device.getId(), new BasicTsKvEntry(43, new StringDataEntry("temperature", "26.0")), 0L)); + addOrUpdate(new LatestTsKv(device2.getId(), new BasicTsKvEntry(43, new DoubleDataEntry("temperature", 25.0)), 0L)); + addOrUpdate(new LatestTsKv(device3.getId(), new BasicTsKvEntry(43, new DoubleDataEntry("temperature", 19.0)), 0L)); } @After @@ -87,6 +94,11 @@ public class EntityTypeFilterTest extends AbstractEDQTest { // find asset entities result = repository.findEntityDataByQuery(tenantId, null, getEntityTypeQuery(EntityType.ASSET, null), false); Assert.assertEquals(0, result.getTotalElements()); + + // find all tenant devices with filter by temperature + KeyFilter tempFilter = getTemperatureFilter(NumericFilterPredicate.NumericOperation.GREATER_OR_EQUAL, 20.0); + result = repository.findEntityDataByQuery(tenantId, null, getEntityTypeQuery(EntityType.DEVICE, List.of(tempFilter)), false); + Assert.assertEquals(2, result.getTotalElements()); } @Test @@ -143,4 +155,15 @@ public class EntityTypeFilterTest extends AbstractEDQTest { return nameFilter; } + private static KeyFilter getTemperatureFilter(NumericFilterPredicate.NumericOperation operation, Double predicateValue) { + KeyFilter tempFilter = new KeyFilter(); + tempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")); + var predicate = new NumericFilterPredicate(); + predicate.setOperation(operation); + predicate.setValue(new FilterPredicateValue<>(predicateValue)); + tempFilter.setPredicate(predicate); + tempFilter.setValueType(EntityKeyValueType.NUMERIC); + return tempFilter; + } + }