diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 893bd13171..7706244680 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -20,6 +20,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.JacksonUtil; @@ -37,6 +39,7 @@ import org.thingsboard.server.common.data.domain.Domain; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -50,6 +53,8 @@ import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.eventsourcing.RelationActionEvent; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.discovery.TopicService; /** * This event listener does not support async event processing because relay on ThreadLocal @@ -65,14 +70,20 @@ import org.thingsboard.server.dao.tenant.TenantService; * future.addCallback(eventPublisher.publishEvent(...)) * } * */ +@Slf4j @Component @RequiredArgsConstructor -@Slf4j +@ConditionalOnExpression("${edges.enabled:true}") public class EdgeEventSourcingListener { + private final TopicService topicService; + private final TbQueueAdmin tbQueueAdmin; + private final TenantService tenantService; private final TbClusterService tbClusterService; private final EdgeSynchronizationManager edgeSynchronizationManager; - private final TenantService tenantService; + + @Value("#{'${queue.type:null}' == 'kafka'}") + private boolean isKafkaSupported; @PostConstruct public void init() { @@ -106,7 +117,15 @@ public class EdgeEventSourcingListener { return; } try { - if (EntityType.EDGE.equals(entityType) || EntityType.TENANT.equals(entityType)) { + if (EntityType.EDGE.equals(entityType)) { + if (isKafkaSupported) { + String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, (EdgeId) event.getEntityId()).getTopic(); + tbQueueAdmin.deleteTopic(topic); + } else { + return; + } + } + if (EntityType.TENANT.equals(entityType)) { return; } log.trace("[{}] DeleteEntityEvent called: {}", tenantId, event); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java index f92daa588a..397e1f6137 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/AbstractEdgeGrpcSession.java @@ -102,40 +102,34 @@ import java.util.function.BiConsumer; @Data public abstract class AbstractEdgeGrpcSession> implements EdgeGrpcSession, Closeable { - protected static final ReentrantLock downlinkMsgLock = new ReentrantLock(); - protected static final ConcurrentLinkedQueue highPriorityQueue = new ConcurrentLinkedQueue<>(); + private static final int MAX_DOWNLINK_ATTEMPTS = 10; + private static final String RATE_LIMIT_REACHED = "Rate limit reached"; - protected static final int MAX_DOWNLINK_ATTEMPTS = 10; - protected static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; - protected static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId"; - protected static final String RATE_LIMIT_REACHED = "Rate limit reached"; + protected static final ConcurrentLinkedQueue highPriorityQueue = new ConcurrentLinkedQueue<>(); protected UUID sessionId; protected BiConsumer sessionOpenListener; protected BiConsumer sessionCloseListener; - protected final EdgeSessionState sessionState = new EdgeSessionState(); + private final EdgeSessionState sessionState = new EdgeSessionState(); + private final ReentrantLock downlinkMsgLock = new ReentrantLock(); protected EdgeContextComponent ctx; protected Edge edge; protected TenantId tenantId; + protected StreamObserver inputStream; protected StreamObserver outputStream; + protected boolean connected; protected volatile boolean syncCompleted; - protected Long newStartTs; - protected Long previousStartTs; - protected Long newStartSeqId; - protected Long previousStartSeqId; - protected Long seqIdEnd; + private EdgeVersion edgeVersion; + private int maxInboundMessageSize; + private int clientMaxInboundMessageSize; + private int maxHighPriorityQueueSizePerSession; - protected EdgeVersion edgeVersion; - protected int maxInboundMessageSize; - protected int clientMaxInboundMessageSize; - protected int maxHighPriorityQueueSizePerSession; - - protected ScheduledExecutorService sendDownlinkExecutorService; + private ScheduledExecutorService sendDownlinkExecutorService; public AbstractEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 53368dfc06..bc63eb251a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -122,7 +122,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Value("${edges.max_high_priority_queue_size_per_session:10000}") private int maxHighPriorityQueueSizePerSession; - @Value("#{ '${queue.type:null}' == 'kafka' }") + @Value("#{'${queue.type:null}' == 'kafka'}") private boolean isKafkaSupported; @Autowired @@ -335,7 +335,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i save(tenantId, edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs); edgeIdServiceIdCache.put(edgeId, serviceInfoProvider.getServiceId()); if (isKafkaSupported) { - TbQueueConsumer> consumer = tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edgeId, serviceInfoProvider.getServiceId()); + TbQueueConsumer> consumer = tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edgeId); ((KafkaEdgeGrpcSession) edgeGrpcSession).initConsumer(consumer); } pushRuleEngineMessage(tenantId, edge, lastConnectTs, TbMsgType.CONNECT_EVENT); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java index 517b550a36..3caea5476a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeEventService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,7 +23,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; -import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.cache.limits.RateLimitService; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -38,12 +37,11 @@ import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.service.DataValidator; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import java.util.Optional; import java.util.UUID; @Slf4j @@ -59,7 +57,6 @@ public class KafkaEdgeEventService implements EdgeEventService { private final TbQueueProducerProvider producerProvider; @Lazy private final TopicService topicService; - private final TbTransactionalCache edgeIdServiceIdCache; @Override public ListenableFuture saveAsync(EdgeEvent edgeEvent) { @@ -70,9 +67,8 @@ public class KafkaEdgeEventService implements EdgeEventService { throw new TbRateLimitsException(EntityType.EDGE); } edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId); - var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeEvent.getEdgeId())); - TopicPartitionInfo tpi = topicService.getEdgeEventNotificationsTopic(edgeEvent.getTenantId(), edgeEvent.getEdgeId(), serviceIdOpt.get().get()); - TransportProtos.ToEdgeEventNotificationMsg msg = TransportProtos.ToEdgeEventNotificationMsg.newBuilder().setEdgeEventMsg(ProtoUtils.toProto(edgeEvent)).build(); + TopicPartitionInfo tpi = topicService.getEdgeEventNotificationsTopic(edgeEvent.getTenantId(), edgeEvent.getEdgeId()); + ToEdgeEventNotificationMsg msg = ToEdgeEventNotificationMsg.newBuilder().setEdgeEventMsg(ProtoUtils.toProto(edgeEvent)).build(); producerProvider.getTbEdgeEventsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(edgeEvent.getTenantId()).entity(edgeEvent).entityId(edgeEvent.getEdgeId()).build()); @@ -85,8 +81,6 @@ public class KafkaEdgeEventService implements EdgeEventService { } @Override - public void cleanupEvents(long ttl) { - - } + public void cleanupEvents(long ttl) {} } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index 78acd02d24..b7857d4e7d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.service.edge.rpc; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.stub.StreamObserver; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; @@ -83,8 +85,21 @@ public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); - sendDownlinkMsgsPack(downlinkMsgsPack).get(); - edgeEventsConsumer.commit(); + Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Boolean isInterrupted) { + if (Boolean.TRUE.equals(isInterrupted)) { + log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId); + } else { + edgeEventsConsumer.commit(); + processEdgeEvents(); + } + } + @Override + public void onFailure(Throwable t) { + log.error("[{}] Failed to send downlink msgs pack", sessionId, t); + } + }, ctx.getGrpcCallbackExecutorService()); return Futures.immediateFuture(Boolean.TRUE); } catch (Exception e) { log.error("[{}][{}] Error occurred while polling edge events from Kafka: {}", tenantId, edge.getId(), e.getMessage()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java index 47fad107a1..8293942523 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/PostgresEdgeGrpcSession.java @@ -46,6 +46,15 @@ import java.util.function.BiConsumer; @Slf4j public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession { + private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; + private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId"; + + private Long newStartTs; + private Long previousStartTs; + private Long newStartSeqId; + private Long previousStartSeqId; + private Long seqIdEnd; + PostgresEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, BiConsumer sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 7c823a0dfa..eb6b0bf47a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -20,9 +20,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; @@ -57,9 +55,6 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; -import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.state.DefaultDeviceStateService; @@ -88,9 +83,6 @@ public abstract class BaseEdgeProcessor { @Autowired protected DbCallbackExecutorService dbCallbackExecutorService; - @Autowired - private TbTransactionalCache edgeIdServiceIdCache; - protected ListenableFuture saveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java index f435be07ff..3eb8348fae 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AbstractCleanUpService.java @@ -28,7 +28,8 @@ public abstract class AbstractCleanUpService { private final PartitionService partitionService; - protected boolean isSystemTenantPartitionMine(){ + protected boolean isSystemTenantPartitionMine() { return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition(); } + } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java index 7c025d8703..ff4d574ea4 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/EdgeEventsCleanUpService.java @@ -32,7 +32,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TABLE_N @TbCoreComponent @Slf4j @Service -@ConditionalOnExpression("${sql.ttl.edge_events.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0") +@ConditionalOnExpression("${edges.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0") public class EdgeEventsCleanUpService extends AbstractCleanUpService { public static final String RANDOM_DELAY_INTERVAL_MS_EXPRESSION = diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java new file mode 100644 index 0000000000..a2d93cdd6b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -0,0 +1,111 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.edge.EdgeService; +import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; +import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.state.DefaultDeviceStateService; + +import java.time.Instant; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +@TbCoreComponent +@RequiredArgsConstructor +@ConditionalOnExpression("'${queue.type:null}'=='kafka' && ${edges.enabled:true}") +public class KafkaEdgeTopicsCleanUpService { + + private static final long ONE_MONTH_MILLIS = TimeUnit.DAYS.toChronoUnit().getDuration().multipliedBy(30).toMillis(); + + private final EdgeService edgeService; + private final TenantService tenantService; + private final AttributesService attributesService; + + private final TopicService topicService; + private final PartitionService partitionService; + + private final TbKafkaSettings kafkaSettings; + private final TbKafkaTopicConfigs kafkaTopicConfigs; + + private final ExecutorService executorService = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("kafka-edge-topic-cleanup")); + + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.checking_interval})}", fixedDelayString = "${sql.ttl.edge_events.checking_interval}") + public void cleanUp() { + executorService.submit(() -> { + PageDataIterable tenants = new PageDataIterable<>(tenantService::findTenantsIds, 10_000); + for (TenantId tenantId : tenants) { + try { + cleanUp(tenantId); + } catch (Exception e) { + log.warn("Failed to drop kafka topics for tenant {}", tenantId, e); + } + } + }); + } + + private void cleanUp(TenantId tenantId) throws Exception { + if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) { + return; + } + + PageDataIterable edges = new PageDataIterable<>(link -> edgeService.findEdgesByTenantId(tenantId, link), 1024); + long currentTimeMillis = System.currentTimeMillis(); + + for (Edge edge : edges) { + Optional attributeOpt = attributesService.find(tenantId, edge.getId(), AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.LAST_CONNECT_TIME).get(); + if (attributeOpt.isPresent()) { + Optional lastConnectTimeOpt = attributeOpt.get().getLongValue(); + if (lastConnectTimeOpt.isPresent() && isTopicExpired(lastConnectTimeOpt.get(), currentTimeMillis)) { + String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic(); + TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); + if (kafkaAdmin.isTopicEmpty(topic)) { + kafkaAdmin.deleteTopic(topic); + log.info("Removed outdated topic for tenant {} and edge {} older than {}", tenantId, edge.getName(), Date.from(Instant.ofEpochMilli(currentTimeMillis - ONE_MONTH_MILLIS))); + } + } + } + } + } + + private boolean isTopicExpired(long lastConnectTime, long currentTimeMillis) { + return lastConnectTime + ONE_MONTH_MILLIS < currentTimeMillis; + } + +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9a7994e92a..180a978744 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1554,6 +1554,9 @@ queue: housekeeper-reprocessing: "${TB_QUEUE_KAFKA_HOUSEKEEPER_REPROCESSING_TOPIC_PROPERTIES:retention.ms:7776000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for Edge topic edge: "${TB_QUEUE_KAFKA_EDGE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" + # Kafka properties for Edge event topic + # TODO : discuss and adjust properties + edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 04f7c2031d..11d320ffa7 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -636,10 +636,6 @@ message ProcessingStrategyProto { int64 maxPauseBetweenRetries = 5; } -message EdgeEvent { - -} - /** * Main Messages; */ diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java index f45c003c0c..168e90cd1b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TopicService.java @@ -61,12 +61,12 @@ public class TopicService { return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false); } - public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId, String serviceId) { - return tbEdgeEventsNotificationTopics.computeIfAbsent(edgeId, id -> buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId, serviceId)); + public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId) { + return tbEdgeEventsNotificationTopics.computeIfAbsent(edgeId, id -> buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId)); } - private TopicPartitionInfo buildEdgeEventNotificationsTopicPartitionInfo(TenantId tenantId, EdgeId edgeId, String serviceId) { - return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId + "." + serviceId, null, null, false); + public TopicPartitionInfo buildEdgeEventNotificationsTopicPartitionInfo(TenantId tenantId, EdgeId edgeId) { + return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false); } private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 2bd283f1e5..d614fe4fdd 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -170,7 +170,16 @@ public class TbKafkaAdmin implements TbQueueAdmin { log.info("[{}] altered new consumer groupId {}", tp, newGroupId); break; } + } + public boolean isTopicEmpty(String topic) { + try { + Map offsets = settings.getAdminClient().listConsumerGroupOffsets("__consumer_offsets").partitionsToOffsetAndMetadata().get(); + return offsets.entrySet().stream().noneMatch(entry -> entry.getKey().topic().equals(topic)); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to check if topic [{}] is empty.", topic, e); + return false; + } } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index 300548e081..ee529e8a68 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -46,6 +46,8 @@ public class TbKafkaTopicConfigs { private String vcProperties; @Value("${queue.kafka.topic-properties.edge:}") private String edgeProperties; + @Value("${queue.kafka.topic-properties.edge-event:}") + private String edgeEventProperties; @Value("${queue.kafka.topic-properties.housekeeper:}") private String housekeeperProperties; @Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}") @@ -75,6 +77,8 @@ public class TbKafkaTopicConfigs { private Map housekeeperReprocessingConfigs; @Getter private Map edgeConfigs; + @Getter + private Map edgeEventConfigs; @PostConstruct private void init() { @@ -92,6 +96,7 @@ public class TbKafkaTopicConfigs { housekeeperConfigs = PropertyUtils.getProps(housekeeperProperties); housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties); edgeConfigs = PropertyUtils.getProps(edgeProperties); + edgeEventConfigs = PropertyUtils.getProps(edgeEventProperties); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java index 6c758409d8..d563307390 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsMonolithQueueFactory.java @@ -285,7 +285,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java index 4f9c130986..056fbb2508 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/AwsSqsTbCoreQueueFactory.java @@ -265,7 +265,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java index b3c2a2f44c..44b2469da8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryMonolithQueueFactory.java @@ -207,7 +207,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 0c5cc94a72..205dbea3af 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -93,6 +93,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin housekeeperAdmin; private final TbQueueAdmin housekeeperReprocessingAdmin; private final TbQueueAdmin edgeAdmin; + private final TbQueueAdmin edgeEventAdmin; private final AtomicLong consumerCount = new AtomicLong(); @@ -131,6 +132,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs()); this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs()); this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); + this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); } @Override @@ -466,14 +468,14 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); - consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId + "." + serviceId)); + consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); consumerBuilder.clientId("monolith-to-edge-event-consumer" + serviceInfoProvider.getServiceId()); consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer")); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); - consumerBuilder.admin(edgeAdmin); + consumerBuilder.admin(edgeEventAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -484,7 +486,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi requestBuilder.settings(kafkaSettings); requestBuilder.clientId("monolith-to-edge-event-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName("edge-events")); - requestBuilder.admin(edgeAdmin); + requestBuilder.admin(edgeEventAdmin); return requestBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index 0866f1cde7..c462bf816e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -92,6 +92,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueAdmin housekeeperAdmin; private final TbQueueAdmin housekeeperReprocessingAdmin; private final TbQueueAdmin edgeAdmin; + private final TbQueueAdmin edgeEventAdmin; private final AtomicLong consumerCount = new AtomicLong(); @@ -131,6 +132,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs()); this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs()); this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); + this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); } @Override @@ -415,14 +417,14 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); - consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId + "." + serviceId)); + consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); consumerBuilder.clientId("tb-core-edge-event-consumer" + serviceInfoProvider.getServiceId()); consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer")); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); - consumerBuilder.admin(edgeAdmin); + consumerBuilder.admin(edgeEventAdmin); consumerBuilder.statsService(consumerStatsService); return consumerBuilder.build(); } @@ -433,7 +435,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-core-edge-event-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName("edge-events")); - requestBuilder.admin(edgeAdmin); + requestBuilder.admin(edgeEventAdmin); return requestBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 7090b87a66..7e9631979c 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -80,6 +80,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TbQueueAdmin fwUpdatesAdmin; private final TbQueueAdmin housekeeperAdmin; private final TbQueueAdmin edgeAdmin; + private final TbQueueAdmin edgeEventAdmin; private final AtomicLong consumerCount = new AtomicLong(); public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, @@ -109,6 +110,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs()); this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs()); this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); + this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs()); } @Override @@ -197,7 +199,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { requestBuilder.settings(kafkaSettings); requestBuilder.clientId("tb-rule-engine-edge-event-" + serviceInfoProvider.getServiceId()); requestBuilder.defaultTopic(topicService.buildTopicName("edge-events")); - requestBuilder.admin(edgeAdmin); + requestBuilder.admin(edgeEventAdmin); return requestBuilder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java index 7859e4f6fa..f0bb294e20 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubMonolithQueueFactory.java @@ -286,7 +286,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java index 3a0db39079..f885f1070e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/PubSubTbCoreQueueFactory.java @@ -258,7 +258,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java index d6ec066db8..a444276d19 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqMonolithQueueFactory.java @@ -284,7 +284,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java index 0896774c33..053d682580 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/RabbitMqTbCoreQueueFactory.java @@ -174,7 +174,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java index c686d713f2..2772dd4ff9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusMonolithQueueFactory.java @@ -283,7 +283,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java index 09abff3ece..640019a8a7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/ServiceBusTbCoreQueueFactory.java @@ -258,7 +258,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { } @Override - public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) { + public TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) { return null; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java index 63fdce09e5..2fa95443ac 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbCoreQueueFactory.java @@ -151,7 +151,7 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, Hous TbQueueProducer> createEdgeNotificationsMsgProducer(); - TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId); + TbQueueConsumer> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId); TbQueueProducer> createEdgeEventMsgProducer(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 1e0202bf79..27fba18617 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -251,6 +251,7 @@ public class EdgeServiceImpl extends AbstractCachedEntityService