From b7265cb6824f14db7047353d61d2a5cd34447a20 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 25 Jan 2023 12:23:59 +0200 Subject: [PATCH] Added check if entity exists before processing telemetry update msg --- .../edge/rpc/processor/BaseEdgeProcessor.java | 21 ++++++ .../relation/BaseRelationProcessor.java | 28 -------- .../telemetry/BaseTelemetryProcessor.java | 66 ++++++++++--------- 3 files changed, 56 insertions(+), 59 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 4b94cee592..36077d3fd5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -485,4 +485,25 @@ public abstract class BaseEdgeProcessor { } return customerId; } + + protected boolean isEntityExists(TenantId tenantId, EntityId entityId) { + switch (entityId.getEntityType()) { + case DEVICE: + return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; + case ASSET: + return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null; + case ENTITY_VIEW: + return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; + case CUSTOMER: + return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; + case USER: + return userService.findUserById(tenantId, new UserId(entityId.getId())) != null; + case DASHBOARD: + return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; + case EDGE: + return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null; + default: + return false; + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java index e1c937eaad..36a5decc3f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java @@ -20,16 +20,9 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DashboardId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg; @@ -80,25 +73,4 @@ public abstract class BaseRelationProcessor extends BaseEdgeProcessor { return Futures.immediateFailedFuture(e); } } - - private boolean isEntityExists(TenantId tenantId, EntityId entityId) { - switch (entityId.getEntityType()) { - case DEVICE: - return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; - case ASSET: - return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null; - case ENTITY_VIEW: - return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; - case CUSTOMER: - return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; - case USER: - return userService.findUserById(tenantId, new UserId(entityId.getId())) != null; - case DASHBOARD: - return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; - case EDGE: - return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null; - default: - return false; - } - } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 30979187d1..a3d3e3c280 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -90,42 +90,46 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); - if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { - Pair pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId); - TbMsgMetaData metaData = pair.getKey(); - CustomerId customerId = pair.getValue(); - metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); - if (entityData.hasPostAttributesMsg()) { - result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); - } - if (entityData.hasAttributesUpdatedMsg()) { - metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); - } - if (entityData.hasPostTelemetryMsg()) { - result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); - } - if (EntityType.DEVICE.equals(entityId.getEntityType())) { - DeviceId deviceId = new DeviceId(entityId.getId()); + if (entityId != null && isEntityExists(tenantId, entityId)) { + if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg())) { + Pair pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId); + TbMsgMetaData metaData = pair.getKey(); + CustomerId customerId = pair.getValue(); + metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); + if (entityData.hasPostAttributesMsg()) { + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); + } + if (entityData.hasAttributesUpdatedMsg()) { + metaData.putValue("scope", entityData.getPostAttributeScope()); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + } + if (entityData.hasPostTelemetryMsg()) { + result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); + } + if (EntityType.DEVICE.equals(entityId.getEntityType())) { + DeviceId deviceId = new DeviceId(entityId.getId()); - long currentTs = System.currentTimeMillis(); + long currentTs = System.currentTimeMillis(); - TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .setLastActivityTime(currentTs).build(); + TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setLastActivityTime(currentTs).build(); - log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs); + log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); - tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), - TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), + TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null); + } } - } - if (entityData.hasAttributeDeleteMsg()) { - result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType())); + if (entityData.hasAttributeDeleteMsg()) { + result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType())); + } + } else { + log.warn("Skipping telemetry update msg because entity doesn't exists on edge, {}", entityData); } return result; }