diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 9f92838712..f1c0260124 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -213,8 +213,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (systemContext.isEdgesEnabled() && edgeId != null) { log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId); try { - Optional edgeAttributeOpt = systemContext.getAttributesService().find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE).get(); - if (edgeAttributeOpt.isPresent() && edgeAttributeOpt.get().getBooleanValue().orElse(false)) { + if (systemContext.getEdgeService().isEdgeActiveAsync(tenantId, edgeId, DefaultDeviceStateService.ACTIVITY_STATE).get()) { saveRpcRequestToEdgeQueue(request, requestId).get(); } else { log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}. The Edge is currently offline or unreachable", tenantId, deviceId, edgeId.getId(), request); diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index 9c45977a7c..835b39caf9 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -55,6 +55,7 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; @@ -209,6 +210,9 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected AttributesService attributesService; + @MockBean + protected TimeseriesService timeseriesService; + @MockBean protected TbClusterService tbClusterService; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java index 15e55d8713..d6d5551ccf 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeService.java @@ -91,4 +91,6 @@ public interface EdgeService extends EntityDaoService { PageData findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId, String tbRuleChainInputNodeClassName); + + ListenableFuture isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String activityState); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 80eea0a478..136fd0485d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -30,10 +30,10 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; @@ -48,14 +48,15 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent; import org.thingsboard.server.dao.exception.DataValidationException; @@ -64,7 +65,7 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.Validator; -import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import javax.annotation.Nullable; @@ -105,7 +106,10 @@ public class EdgeServiceImpl extends AbstractCachedEntityService edgeValidator; @@ -113,6 +117,8 @@ public class EdgeServiceImpl extends AbstractCachedEntityService isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String key) { + ListenableFuture> futureKvEntry; + if (persistToTelemetry) { + futureKvEntry = timeseriesService.findLatest(tenantId, edgeId, key); + } else { + futureKvEntry = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, key); + } + return Futures.transformAsync(futureKvEntry, kvEntryOpt -> + Futures.immediateFuture(kvEntryOpt.flatMap(KvEntry::getBooleanValue).orElse(false)), MoreExecutors.directExecutor()); + } + private List findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { List result = new ArrayList<>(); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);