added entity added/deleted events handling

This commit is contained in:
IrynaMatveieva 2024-11-28 11:28:51 +02:00
parent f9db64a14d
commit 2080b439a7
8 changed files with 147 additions and 14 deletions

View File

@ -31,4 +31,8 @@ public interface CalculatedFieldExecutionService {
void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback);
void onEntityAdded(TransportProtos.EntityAddMsgProto proto, TbCallback callback);
void onEntityDeleted(TransportProtos.EntityDeleteMsg proto, TbCallback callback);
}

View File

@ -74,8 +74,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.LastRecordsCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
@ -227,6 +227,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
@Override
public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map<String, KvEntry> updatedTelemetry) {
try {
log.info("Received telemetry update msg: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId);
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> {
CalculatedField calculatedField = calculatedFields.computeIfAbsent(id, cfId -> calculatedFieldService.findById(tenantId, id));
return new CalculatedFieldCtx(calculatedField, tbelInvokeService);
@ -247,7 +248,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
EntityId oldProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getOldProfileIdMSB(), proto.getOldProfileIdLSB()));
EntityId newProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getNewProfileIdMSB(), proto.getNewProfileIdLSB()));
log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId);
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId)
@ -257,10 +257,44 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
});
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, newProfileId)
initializeStateForEntityByProfile(tenantId, entityId, newProfileId, callback);
} catch (Exception e) {
log.trace("Failed to process entity type update msg: [{}]", proto, e);
}
}
@Override
public void onEntityAdded(TransportProtos.EntityAddMsgProto proto, TbCallback callback) {
try {
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
EntityId profileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getProfileIdMSB(), proto.getProfileIdLSB()));
log.info("Received EntityCreateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId);
initializeStateForEntityByProfile(tenantId, entityId, profileId, callback);
} catch (Exception e) {
log.trace("Failed to process entity type update msg: [{}]", proto, e);
}
}
private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) {
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, profileId)
.stream()
.map(cfId -> calculatedFieldsCtx.computeIfAbsent(cfId, id -> new CalculatedFieldCtx(calculatedFieldService.findById(tenantId, id), tbelInvokeService)))
.forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback));
}
@Override
public void onEntityDeleted(TransportProtos.EntityDeleteMsg proto, TbCallback callback) {
try {
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
log.info("Received EntityDeleteMsg for processing: entityId=[{}]", entityId);
List<String> statesToRemove = states.keySet().stream()
.filter(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId()))
.map(JacksonUtil::writeValueAsString)
.toList();
states.keySet().removeIf(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId()));
rocksDBService.deleteAll(statesToRemove);
} catch (Exception e) {
log.trace("Failed to process entity type update msg: [{}]", proto, e);
}

View File

@ -146,7 +146,11 @@ public class EntityStateSourcingListener {
log.debug("[{}][{}][{}] Handling entity deletion event: {}", tenantId, entityType, entityId, event);
switch (entityType) {
case ASSET, ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> {
case ASSET -> {
Asset asset = (Asset) event.getEntity();
tbClusterService.onAssetDeleted(tenantId, asset, null);
}
case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> {
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED);
}
case NOTIFICATION_REQUEST -> {

View File

@ -388,11 +388,26 @@ public class DefaultTbClusterService implements TbClusterService {
public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) {
DeviceId deviceId = device.getId();
gatewayNotificationsService.onDeviceDeleted(device);
handleEntityDelete(tenantId, deviceId, device.getDeviceProfileId());
broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback);
sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true);
broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED);
}
@Override
public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) {
AssetId assetId = asset.getId();
handleEntityDelete(tenantId, assetId, asset.getAssetProfileId());
broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED);
}
private void handleEntityDelete(TenantId tenantId, EntityId entityId, EntityId profileId) {
boolean cfExistsByProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, profileId);
if (cfExistsByProfile) {
sendEntityDeletedEvent(tenantId, entityId);
}
}
@Override
public void onDeviceAssignedToTenant(TenantId oldTenantId, Device device) {
onDeviceDeleted(oldTenantId, device, null);
@ -613,11 +628,13 @@ public class DefaultTbClusterService implements TbClusterService {
}
boolean deviceTypeChanged = !device.getType().equals(old.getType());
if (deviceTypeChanged) {
handleProfileChange(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId());
handleProfileUpdate(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId());
}
if (deviceNameChanged || deviceTypeChanged) {
pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null);
}
} else {
handleEntityCreate(device.getTenantId(), device.getId(), device.getDeviceProfileId());
}
broadcastEntityStateChangeEvent(device.getTenantId(), device.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false);
@ -631,16 +648,26 @@ public class DefaultTbClusterService implements TbClusterService {
if (old != null) {
boolean assetTypeChanged = !asset.getType().equals(old.getType());
if (assetTypeChanged) {
handleProfileChange(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId());
handleProfileUpdate(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId());
}
} else {
handleEntityCreate(asset.getTenantId(), asset.getId(), asset.getAssetProfileId());
}
broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
}
private void handleProfileChange(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
boolean cfExistsByProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, oldProfileId);
private void handleProfileUpdate(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
boolean cfExistsByOldProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, oldProfileId);
boolean cfExistsByNewProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, newProfileId);
if (cfExistsByOldProfile || cfExistsByNewProfile) {
sendEntityProfileUpdatedEvent(tenantId, entityId, oldProfileId, newProfileId);
}
}
private void handleEntityCreate(TenantId tenantId, EntityId entityId, EntityId profileId) {
boolean cfExistsByProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, profileId);
if (cfExistsByProfile) {
sendEntityTypeUpdatedEvent(tenantId, entityId, oldProfileId, newProfileId);
sendEntityAddedEvent(tenantId, entityId, profileId);
}
}
@ -801,7 +828,7 @@ public class DefaultTbClusterService implements TbClusterService {
pushMsgToCore(tenantId, calculatedFieldId, ToCoreMsg.newBuilder().setCalculatedFieldMsg(msg).build(), null);
}
private void sendEntityTypeUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
private void sendEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
TransportProtos.EntityProfileUpdateMsgProto.Builder builder = TransportProtos.EntityProfileUpdateMsgProto.newBuilder();
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
@ -817,4 +844,26 @@ public class DefaultTbClusterService implements TbClusterService {
pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityProfileUpdateMsg(msg).build(), null);
}
private void sendEntityAddedEvent(TenantId tenantId, EntityId entityId, EntityId profileId) {
TransportProtos.EntityAddMsgProto.Builder builder = TransportProtos.EntityAddMsgProto.newBuilder();
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
builder.setEntityType(entityId.getEntityType().name());
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
builder.setEntityProfileType(profileId.getEntityType().name());
builder.setProfileIdMSB(profileId.getId().getMostSignificantBits());
builder.setProfileIdLSB(profileId.getId().getLeastSignificantBits());
TransportProtos.EntityAddMsgProto msg = builder.build();
pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityAddMsg(msg).build(), null);
}
private void sendEntityDeletedEvent(TenantId tenantId, EntityId entityId) {
TransportProtos.EntityDeleteMsg.Builder builder = TransportProtos.EntityDeleteMsg.newBuilder();
builder.setEntityType(entityId.getEntityType().name());
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityDeleteMsg(builder).build(), null);
}
}

View File

@ -320,6 +320,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
forwardToCalculatedFieldService(toCoreMsg.getCalculatedFieldMsg(), callback);
} else if (toCoreMsg.hasEntityProfileUpdateMsg()) {
forwardToCalculatedFieldService(toCoreMsg.getEntityProfileUpdateMsg(), callback);
} else if (toCoreMsg.hasEntityAddMsg()) {
forwardToCalculatedFieldService(toCoreMsg.getEntityAddMsg(), callback);
} else if (toCoreMsg.hasEntityDeleteMsg()) {
forwardToCalculatedFieldService(toCoreMsg.getEntityDeleteMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
@ -694,6 +698,29 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
});
}
private void forwardToCalculatedFieldService(TransportProtos.EntityAddMsgProto entityCreateMsg, TbCallback callback) {
var tenantId = toTenantId(entityCreateMsg.getTenantIdMSB(), entityCreateMsg.getTenantIdLSB());
var entityId = EntityIdFactory.getByTypeAndUuid(entityCreateMsg.getEntityType(), new UUID(entityCreateMsg.getEntityIdMSB(), entityCreateMsg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityAdded(entityCreateMsg, callback));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process entity create message for entityId [{}]", tenantId.getId(), entityId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToCalculatedFieldService(TransportProtos.EntityDeleteMsg entityDeleteMsg, TbCallback callback) {
var entityId = EntityIdFactory.getByTypeAndUuid(entityDeleteMsg.getEntityType(), new UUID(entityDeleteMsg.getEntityIdMSB(), entityDeleteMsg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityDeleted(entityDeleteMsg, callback));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("Failed to process entity delete message for entity [{}]", entityId, t);
callback.onFailure(t);
});
}
private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB()));

View File

@ -100,6 +100,8 @@ public interface TbClusterService extends TbQueueClusterService {
void onAssetUpdated(Asset asset, Asset old);
void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback);
void onResourceChange(TbResourceInfo resource, TbQueueCallback callback);
void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback);

View File

@ -794,6 +794,17 @@ message EntityProfileUpdateMsgProto {
int64 newProfileIdLSB = 10;
}
message EntityAddMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string entityType = 3;
int64 entityIdMSB = 4;
int64 entityIdLSB = 5;
string entityProfileType = 6;
int64 profileIdMSB = 7;
int64 profileIdLSB = 8;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
message SubscriptionInfoProto {
int64 lastActivityTime = 1;
@ -1527,6 +1538,8 @@ message ToCoreMsg {
DeviceInactivityProto deviceInactivityMsg = 52;
CalculatedFieldMsgProto calculatedFieldMsg = 53;
EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54;
EntityAddMsgProto entityAddMsg = 55;
EntityDeleteMsg entityDeleteMsg = 56;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */

View File

@ -239,7 +239,7 @@ public class BaseAssetService extends AbstractCachedEntityService<AssetCacheKey,
publishEvictEvent(new AssetCacheEvictEvent(asset.getTenantId(), asset.getName(), null));
countService.publishCountEntityEvictEvent(tenantId, EntityType.ASSET);
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(asset.getId()).build());
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(asset.getId()).entity(asset).build());
}
@Override