Added check if entity exists before processing telemetry update msg

This commit is contained in:
Volodymyr Babak 2023-01-25 12:23:59 +02:00
parent 2d587f500b
commit b7265cb682
3 changed files with 56 additions and 59 deletions

View File

@ -485,4 +485,25 @@ public abstract class BaseEdgeProcessor {
}
return customerId;
}
protected boolean isEntityExists(TenantId tenantId, EntityId entityId) {
switch (entityId.getEntityType()) {
case DEVICE:
return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
case ASSET:
return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null;
case ENTITY_VIEW:
return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null;
case CUSTOMER:
return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null;
case USER:
return userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
case DASHBOARD:
return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
case EDGE:
return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
default:
return false;
}
}
}

View File

@ -20,16 +20,9 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg;
@ -80,25 +73,4 @@ public abstract class BaseRelationProcessor extends BaseEdgeProcessor {
return Futures.immediateFailedFuture(e);
}
}
private boolean isEntityExists(TenantId tenantId, EntityId entityId) {
switch (entityId.getEntityType()) {
case DEVICE:
return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
case ASSET:
return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null;
case ENTITY_VIEW:
return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null;
case CUSTOMER:
return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null;
case USER:
return userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
case DASHBOARD:
return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
case EDGE:
return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
default:
return false;
}
}
}

View File

@ -90,42 +90,46 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData);
List<ListenableFuture<Void>> result = new ArrayList<>();
EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB());
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
Pair<TbMsgMetaData, CustomerId> pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId);
TbMsgMetaData metaData = pair.getKey();
CustomerId customerId = pair.getValue();
metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey());
if (entityData.hasPostAttributesMsg()) {
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
}
if (entityData.hasAttributesUpdatedMsg()) {
metaData.putValue("scope", entityData.getPostAttributeScope());
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
}
if (entityData.hasPostTelemetryMsg()) {
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
}
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceId deviceId = new DeviceId(entityId.getId());
if (entityId != null && isEntityExists(tenantId, entityId)) {
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg())) {
Pair<TbMsgMetaData, CustomerId> pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId);
TbMsgMetaData metaData = pair.getKey();
CustomerId customerId = pair.getValue();
metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey());
if (entityData.hasPostAttributesMsg()) {
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
}
if (entityData.hasAttributesUpdatedMsg()) {
metaData.putValue("scope", entityData.getPostAttributeScope());
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
}
if (entityData.hasPostTelemetryMsg()) {
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
}
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceId deviceId = new DeviceId(entityId.getId());
long currentTs = System.currentTimeMillis();
long currentTs = System.currentTimeMillis();
TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(currentTs).build();
TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(currentTs).build();
log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs);
log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null);
}
}
}
if (entityData.hasAttributeDeleteMsg()) {
result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType()));
if (entityData.hasAttributeDeleteMsg()) {
result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType()));
}
} else {
log.warn("Skipping telemetry update msg because entity doesn't exists on edge, {}", entityData);
}
return result;
}