Improve edge service to find active state based on persistToTelemetry else attribute

This commit is contained in:
Andrii Landiak 2024-02-12 16:24:02 +02:00
parent c62a958353
commit a4c5617cc2
4 changed files with 29 additions and 6 deletions

View File

@ -213,8 +213,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
if (systemContext.isEdgesEnabled() && edgeId != null) { if (systemContext.isEdgesEnabled() && edgeId != null) {
log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId); log.debug("[{}][{}] device is related to edge: [{}]. Saving RPC request: [{}][{}] to edge queue", tenantId, deviceId, edgeId.getId(), rpcId, requestId);
try { try {
Optional<AttributeKvEntry> edgeAttributeOpt = systemContext.getAttributesService().find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE).get(); if (systemContext.getEdgeService().isEdgeActiveAsync(tenantId, edgeId, DefaultDeviceStateService.ACTIVITY_STATE).get()) {
if (edgeAttributeOpt.isPresent() && edgeAttributeOpt.get().getBooleanValue().orElse(false)) {
saveRpcRequestToEdgeQueue(request, requestId).get(); saveRpcRequestToEdgeQueue(request, requestId).get();
} else { } else {
log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}. The Edge is currently offline or unreachable", tenantId, deviceId, edgeId.getId(), request); log.error("[{}][{}][{}] Failed to save RPC request to edge queue {}. The Edge is currently offline or unreachable", tenantId, deviceId, edgeId.getId(), request);

View File

@ -55,6 +55,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.dao.widget.WidgetsBundleService;
@ -209,6 +210,9 @@ public abstract class BaseEdgeProcessorTest {
@MockBean @MockBean
protected AttributesService attributesService; protected AttributesService attributesService;
@MockBean
protected TimeseriesService timeseriesService;
@MockBean @MockBean
protected TbClusterService tbClusterService; protected TbClusterService tbClusterService;

View File

@ -91,4 +91,6 @@ public interface EdgeService extends EntityDaoService {
PageData<EdgeId> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink); PageData<EdgeId> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId, PageLink pageLink);
String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId, String tbRuleChainInputNodeClassName); String findMissingToRelatedRuleChains(TenantId tenantId, EdgeId edgeId, String tbRuleChainInputNodeClassName);
ListenableFuture<Boolean> isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String activityState);
} }

View File

@ -30,10 +30,10 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
@ -48,14 +48,15 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent; import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent;
import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.DataValidationException;
@ -64,7 +65,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.PaginatedRemover;
import org.thingsboard.server.dao.service.Validator; import org.thingsboard.server.dao.service.Validator;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -105,7 +106,10 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
private RelationService relationService; private RelationService relationService;
@Autowired @Autowired
private TenantService tenantService; private TimeseriesService timeseriesService;
@Autowired
private AttributesService attributesService;
@Autowired @Autowired
private DataValidator<Edge> edgeValidator; private DataValidator<Edge> edgeValidator;
@ -113,6 +117,8 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
@Value("${edges.enabled}") @Value("${edges.enabled}")
@Getter @Getter
private boolean edgesEnabled; private boolean edgesEnabled;
@Value("${edges.state.persistToTelemetry:false}")
private boolean persistToTelemetry;
@TransactionalEventListener(classes = EdgeCacheEvictEvent.class) @TransactionalEventListener(classes = EdgeCacheEvictEvent.class)
@Override @Override
@ -530,6 +536,18 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
return result.toString(); return result.toString();
} }
@Override
public ListenableFuture<Boolean> isEdgeActiveAsync(TenantId tenantId, EdgeId edgeId, String key) {
ListenableFuture<? extends Optional<? extends KvEntry>> futureKvEntry;
if (persistToTelemetry) {
futureKvEntry = timeseriesService.findLatest(tenantId, edgeId, key);
} else {
futureKvEntry = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, key);
}
return Futures.transformAsync(futureKvEntry, kvEntryOpt ->
Futures.immediateFuture(kvEntryOpt.flatMap(KvEntry::getBooleanValue).orElse(false)), MoreExecutors.directExecutor());
}
private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) { private List<RuleChain> findEdgeRuleChains(TenantId tenantId, EdgeId edgeId) {
List<RuleChain> result = new ArrayList<>(); List<RuleChain> result = new ArrayList<>();
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);