handled profile updates in cluster
This commit is contained in:
		
							parent
							
								
									63b79b7242
								
							
						
					
					
						commit
						4ebb68ded6
					
				@ -34,6 +34,8 @@ public interface CalculatedFieldCache {
 | 
			
		||||
 | 
			
		||||
    List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId);
 | 
			
		||||
 | 
			
		||||
    void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId);
 | 
			
		||||
 | 
			
		||||
    CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService);
 | 
			
		||||
 | 
			
		||||
    Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityId);
 | 
			
		||||
 | 
			
		||||
@ -141,6 +141,31 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
 | 
			
		||||
        return cfLinks;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
 | 
			
		||||
        log.debug("Update calculated field links per entity for calculated field: [{}]", calculatedFieldId);
 | 
			
		||||
        calculatedFieldFetchLock.lock();
 | 
			
		||||
        try {
 | 
			
		||||
            List<CalculatedFieldLink> cfLinks = getCalculatedFieldLinks(tenantId, calculatedFieldId);
 | 
			
		||||
            if (cfLinks != null && !cfLinks.isEmpty()) {
 | 
			
		||||
                cfLinks.forEach(link -> {
 | 
			
		||||
                    entityIdCalculatedFieldLinks.compute(link.getEntityId(), (id, existingList) -> {
 | 
			
		||||
                        if (existingList == null) {
 | 
			
		||||
                            existingList = new ArrayList<>();
 | 
			
		||||
                        } else if (!(existingList instanceof ArrayList)) {
 | 
			
		||||
                            existingList = new ArrayList<>(existingList);
 | 
			
		||||
                        }
 | 
			
		||||
                        existingList.add(link);
 | 
			
		||||
                        return existingList;
 | 
			
		||||
                    });
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
        } finally {
 | 
			
		||||
            calculatedFieldFetchLock.unlock();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) {
 | 
			
		||||
        CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId);
 | 
			
		||||
 | 
			
		||||
@ -71,7 +71,6 @@ import org.thingsboard.server.dao.attributes.AttributesService;
 | 
			
		||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
 | 
			
		||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
 | 
			
		||||
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
 | 
			
		||||
@ -105,7 +104,6 @@ import static org.thingsboard.server.common.data.DataConstants.SCOPE;
 | 
			
		||||
import static org.thingsboard.server.common.util.ProtoUtils.fromObjectProto;
 | 
			
		||||
import static org.thingsboard.server.common.util.ProtoUtils.toObjectProto;
 | 
			
		||||
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@ -117,7 +115,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
    private final CalculatedFieldCache calculatedFieldCache;
 | 
			
		||||
    private final AttributesService attributesService;
 | 
			
		||||
    private final TimeseriesService timeseriesService;
 | 
			
		||||
    private final RocksDBService rocksDBService;
 | 
			
		||||
    //    private final RocksDBService rocksDBService;
 | 
			
		||||
    private final TbClusterService clusterService;
 | 
			
		||||
    private final TbelInvokeService tbelInvokeService;
 | 
			
		||||
 | 
			
		||||
@ -213,8 +211,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
 | 
			
		||||
    private void restoreState(CalculatedField cf, EntityId entityId) {
 | 
			
		||||
        CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cf.getId().getId(), entityId.getId());
 | 
			
		||||
        String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId));
 | 
			
		||||
//        String storedState = rocksDBService.get(JacksonUtil.writeValueAsString(ctxId));
 | 
			
		||||
 | 
			
		||||
        String storedState = null;
 | 
			
		||||
        if (storedState != null) {
 | 
			
		||||
            CalculatedFieldEntityCtx restoredCtx = JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class);
 | 
			
		||||
            states.put(ctxId, restoredCtx);
 | 
			
		||||
@ -313,13 +312,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            if (calculatedFieldIds != null) {
 | 
			
		||||
                calculatedFieldIds.remove(calculatedFieldId);
 | 
			
		||||
            }
 | 
			
		||||
            calculatedFieldCache.evict(calculatedFieldId);
 | 
			
		||||
//            calculatedFieldCache.evict(calculatedFieldId);
 | 
			
		||||
            states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId()));
 | 
			
		||||
            List<String> statesToRemove = states.keySet().stream()
 | 
			
		||||
                    .filter(ctxId -> ctxId.cfId().equals(calculatedFieldId.getId()))
 | 
			
		||||
                    .map(JacksonUtil::writeValueAsString)
 | 
			
		||||
                    .toList();
 | 
			
		||||
            rocksDBService.deleteAll(statesToRemove);
 | 
			
		||||
//            rocksDBService.deleteAll(statesToRemove);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.trace("Failed to delete calculated field: [{}]", calculatedFieldId, e);
 | 
			
		||||
            callback.onFailure(e);
 | 
			
		||||
@ -414,7 +413,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            }
 | 
			
		||||
            default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues, calculatedFieldIds);
 | 
			
		||||
        }
 | 
			
		||||
        log.info("Successfully updated telemetry for calculatedFieldId: [{}]", calculatedFieldId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -423,7 +421,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
 | 
			
		||||
            CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB()));
 | 
			
		||||
            EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
 | 
			
		||||
 | 
			
		||||
            log.info("Received CalculatedFieldStateMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}], entityId=[{}]", tenantId, calculatedFieldId, entityId);
 | 
			
		||||
            if (proto.getClear()) {
 | 
			
		||||
                clearState(tenantId, calculatedFieldId, entityId);
 | 
			
		||||
                return;
 | 
			
		||||
@ -431,7 +429,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
 | 
			
		||||
            List<CalculatedFieldId> calculatedFieldIds = proto.getCalculatedFieldsList().stream()
 | 
			
		||||
                    .map(cfIdProto -> new CalculatedFieldId(new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
 | 
			
		||||
                    .toList();
 | 
			
		||||
                    .collect(Collectors.toCollection(ArrayList::new));
 | 
			
		||||
            Map<String, ArgumentEntry> argumentsMap = proto.getArgumentsMap().entrySet().stream()
 | 
			
		||||
                    .collect(Collectors.toMap(Map.Entry::getKey, entry -> fromArgumentEntryProto(entry.getValue())));
 | 
			
		||||
 | 
			
		||||
@ -451,8 +449,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            EntityId newProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getNewProfileIdMSB(), proto.getNewProfileIdLSB()));
 | 
			
		||||
            log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId);
 | 
			
		||||
 | 
			
		||||
            calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId);
 | 
			
		||||
            calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId);
 | 
			
		||||
//            calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfileId).remove(entityId);
 | 
			
		||||
//            calculatedFieldCache.getEntitiesByProfile(tenantId, newProfileId).add(entityId);
 | 
			
		||||
 | 
			
		||||
            calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId)
 | 
			
		||||
                    .forEach(cfId -> clearState(tenantId, cfId, entityId));
 | 
			
		||||
@ -472,15 +470,18 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            log.info("Received ProfileEntityMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId);
 | 
			
		||||
            if (proto.getDeleted()) {
 | 
			
		||||
                log.info("Executing profile entity deleted msg,  tenantId=[{}], entityId=[{}]", tenantId, entityId);
 | 
			
		||||
                calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId);
 | 
			
		||||
//                calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).remove(entityId);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
                List<CalculatedFieldId> calculatedFieldIds = Stream.concat(
 | 
			
		||||
                        calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream().map(CalculatedFieldLink::getCalculatedFieldId),
 | 
			
		||||
                        calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream().map(CalculatedFieldLink::getCalculatedFieldId)
 | 
			
		||||
                ).toList();
 | 
			
		||||
 | 
			
		||||
                calculatedFieldIds.forEach(cfId -> clearState(tenantId, cfId, entityId));
 | 
			
		||||
            } else {
 | 
			
		||||
                log.info("Executing profile entity added msg,  tenantId=[{}], entityId=[{}]", tenantId, entityId);
 | 
			
		||||
                calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId);
 | 
			
		||||
//                calculatedFieldCache.getEntitiesByProfile(tenantId, profileId).add(entityId);
 | 
			
		||||
                initializeStateForEntityByProfile(tenantId, entityId, profileId, callback);
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
@ -494,7 +495,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            log.warn("Executing clearState, calculatedFieldId=[{}], entityId=[{}]", calculatedFieldId, entityId);
 | 
			
		||||
            CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId.getId(), entityId.getId());
 | 
			
		||||
            states.remove(ctxId);
 | 
			
		||||
            rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
 | 
			
		||||
//            rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId));
 | 
			
		||||
        } else {
 | 
			
		||||
            sendClearCalculatedFieldStateMsg(tenantId, calculatedFieldId, entityId);
 | 
			
		||||
        }
 | 
			
		||||
@ -558,13 +559,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
                Consumer<CalculatedFieldState> performUpdateState = (state) -> {
 | 
			
		||||
                    if (state.updateState(argumentsMap)) {
 | 
			
		||||
                        calculatedFieldEntityCtx.setState(state);
 | 
			
		||||
                        rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
 | 
			
		||||
//                        rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx));
 | 
			
		||||
                        Map<String, ArgumentEntry> arguments = state.getArguments();
 | 
			
		||||
                        boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) &&
 | 
			
		||||
                                !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY);
 | 
			
		||||
                        if (allArgsPresent) {
 | 
			
		||||
                            performCalculation(calculatedFieldCtx, state, entityId, calculatedFieldIds);
 | 
			
		||||
                        }
 | 
			
		||||
                        log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId);
 | 
			
		||||
                    }
 | 
			
		||||
                    updateFuture.complete(null);
 | 
			
		||||
                };
 | 
			
		||||
@ -633,6 +635,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            calculatedFieldIds.add(calculatedFieldId);
 | 
			
		||||
            TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).calculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build();
 | 
			
		||||
            clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null);
 | 
			
		||||
            log.info("Pushed message to rule engine: originatorId=[{}]", originatorId);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e);
 | 
			
		||||
        }
 | 
			
		||||
@ -715,6 +718,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
            ));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        log.info("Sending calculated field state msg from entityId [{}]", entityId);
 | 
			
		||||
        clusterService.pushMsgToCore(tenantId, calculatedFieldId, TransportProtos.ToCoreMsg.newBuilder().setCalculatedFieldStateMsg(msgBuilder).build(), null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -790,11 +794,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) {
 | 
			
		||||
        String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId));
 | 
			
		||||
        if (stateStr == null) {
 | 
			
		||||
            return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType));
 | 
			
		||||
        }
 | 
			
		||||
        return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class);
 | 
			
		||||
//        String stateStr = rocksDBService.get(JacksonUtil.writeValueAsString(entityCtxId));
 | 
			
		||||
//        if (stateStr == null) {
 | 
			
		||||
        return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType));
 | 
			
		||||
//        }
 | 
			
		||||
//        return JacksonUtil.fromString(stateStr, CalculatedFieldEntityCtx.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) {
 | 
			
		||||
 | 
			
		||||
@ -391,7 +391,7 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
    public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) {
 | 
			
		||||
        DeviceId deviceId = device.getId();
 | 
			
		||||
        gatewayNotificationsService.onDeviceDeleted(device);
 | 
			
		||||
        sendProfileEntityEvent(tenantId, deviceId, device.getDeviceProfileId(), false, true);
 | 
			
		||||
        handleProfileEntityEvent(tenantId, deviceId, device.getDeviceProfileId(), false, true);
 | 
			
		||||
        broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback);
 | 
			
		||||
        sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true);
 | 
			
		||||
        broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED);
 | 
			
		||||
@ -400,7 +400,7 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
    @Override
 | 
			
		||||
    public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) {
 | 
			
		||||
        AssetId assetId = asset.getId();
 | 
			
		||||
        sendProfileEntityEvent(tenantId, assetId, asset.getAssetProfileId(), false, true);
 | 
			
		||||
        handleProfileEntityEvent(tenantId, assetId, asset.getAssetProfileId(), true, true);
 | 
			
		||||
        broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -563,7 +563,9 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
                || entityType.equals(EntityType.API_USAGE_STATE)
 | 
			
		||||
                || (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED)
 | 
			
		||||
                || entityType.equals(EntityType.ENTITY_VIEW)
 | 
			
		||||
                || entityType.equals(EntityType.NOTIFICATION_RULE)) {
 | 
			
		||||
                || entityType.equals(EntityType.NOTIFICATION_RULE)
 | 
			
		||||
                || entityType.equals(EntityType.CALCULATED_FIELD)
 | 
			
		||||
        ) {
 | 
			
		||||
            TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
 | 
			
		||||
            Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
 | 
			
		||||
            for (String serviceId : tbCoreServices) {
 | 
			
		||||
@ -624,13 +626,13 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
            }
 | 
			
		||||
            boolean deviceTypeChanged = !device.getType().equals(old.getType());
 | 
			
		||||
            if (deviceTypeChanged) {
 | 
			
		||||
                sendEntityProfileUpdatedEvent(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId());
 | 
			
		||||
                handleEntityProfileUpdatedEvent(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId());
 | 
			
		||||
            }
 | 
			
		||||
            if (deviceNameChanged || deviceTypeChanged) {
 | 
			
		||||
                pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null);
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            sendProfileEntityEvent(device.getTenantId(), device.getId(), device.getDeviceProfileId(), true, false);
 | 
			
		||||
            handleProfileEntityEvent(device.getTenantId(), device.getId(), device.getDeviceProfileId(), true, false);
 | 
			
		||||
        }
 | 
			
		||||
        broadcastEntityStateChangeEvent(device.getTenantId(), device.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
 | 
			
		||||
        sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false);
 | 
			
		||||
@ -644,10 +646,10 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
        if (old != null) {
 | 
			
		||||
            boolean assetTypeChanged = !asset.getType().equals(old.getType());
 | 
			
		||||
            if (assetTypeChanged) {
 | 
			
		||||
                sendEntityProfileUpdatedEvent(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId());
 | 
			
		||||
                handleEntityProfileUpdatedEvent(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId());
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            sendProfileEntityEvent(asset.getTenantId(), asset.getId(), asset.getAssetProfileId(), true, false);
 | 
			
		||||
            handleProfileEntityEvent(asset.getTenantId(), asset.getId(), asset.getAssetProfileId(), true, false);
 | 
			
		||||
        }
 | 
			
		||||
        broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
 | 
			
		||||
    }
 | 
			
		||||
@ -792,8 +794,8 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
    public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback) {
 | 
			
		||||
        CalculatedFieldId calculatedFieldId = calculatedField.getId();
 | 
			
		||||
        broadcastEntityDeleteToTransport(tenantId, calculatedFieldId, calculatedField.getName(), callback);
 | 
			
		||||
        sendCalculatedFieldEvent(tenantId, calculatedFieldId, false, false, true);
 | 
			
		||||
        broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED);
 | 
			
		||||
        sendCalculatedFieldEvent(tenantId, calculatedFieldId, false, false, true);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void sendCalculatedFieldEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, boolean added, boolean updated, boolean deleted) {
 | 
			
		||||
@ -809,7 +811,7 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
        pushMsgToCore(tenantId, calculatedFieldId, ToCoreMsg.newBuilder().setCalculatedFieldMsg(msg).build(), null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void sendEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
 | 
			
		||||
    private void handleEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
 | 
			
		||||
        TransportProtos.EntityProfileUpdateMsgProto.Builder builder = TransportProtos.EntityProfileUpdateMsgProto.newBuilder();
 | 
			
		||||
        builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
 | 
			
		||||
        builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
 | 
			
		||||
@ -822,10 +824,12 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
        builder.setNewProfileIdMSB(newProfileId.getId().getMostSignificantBits());
 | 
			
		||||
        builder.setNewProfileIdLSB(newProfileId.getId().getLeastSignificantBits());
 | 
			
		||||
        TransportProtos.EntityProfileUpdateMsgProto msg = builder.build();
 | 
			
		||||
 | 
			
		||||
        broadcastToCore(ToCoreNotificationMsg.newBuilder().setEntityProfileUpdateMsg(msg).build());
 | 
			
		||||
        pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityProfileUpdateMsg(msg).build(), null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void sendProfileEntityEvent(TenantId tenantId, EntityId entityId, EntityId profileId, boolean added, boolean deleted) {
 | 
			
		||||
    private void handleProfileEntityEvent(TenantId tenantId, EntityId entityId, EntityId profileId, boolean added, boolean deleted) {
 | 
			
		||||
        TransportProtos.ProfileEntityMsgProto.Builder builder = TransportProtos.ProfileEntityMsgProto.newBuilder();
 | 
			
		||||
        builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
 | 
			
		||||
        builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
 | 
			
		||||
@ -838,6 +842,8 @@ public class DefaultTbClusterService implements TbClusterService {
 | 
			
		||||
        builder.setAdded(added);
 | 
			
		||||
        builder.setDeleted(deleted);
 | 
			
		||||
        TransportProtos.ProfileEntityMsgProto msg = builder.build();
 | 
			
		||||
 | 
			
		||||
        broadcastToCore(ToCoreNotificationMsg.newBuilder().setProfileEntityMsg(msg).build());
 | 
			
		||||
        pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setProfileEntityMsg(msg).build(), null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.event.Event;
 | 
			
		||||
import org.thingsboard.server.common.data.event.LifecycleEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRequestId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
@ -87,6 +88,7 @@ import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldCache;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
 | 
			
		||||
import org.thingsboard.server.service.notification.NotificationSchedulerService;
 | 
			
		||||
import org.thingsboard.server.service.ota.OtaPackageStateService;
 | 
			
		||||
@ -109,6 +111,7 @@ import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpd
 | 
			
		||||
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
@ -181,8 +184,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
 | 
			
		||||
                                        NotificationRuleProcessor notificationRuleProcessor,
 | 
			
		||||
                                        TbImageService imageService,
 | 
			
		||||
                                        RuleEngineCallService ruleEngineCallService,
 | 
			
		||||
                                        CalculatedFieldExecutionService calculatedFieldExecutionService) {
 | 
			
		||||
        super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
 | 
			
		||||
                                        CalculatedFieldExecutionService calculatedFieldExecutionService,
 | 
			
		||||
                                        CalculatedFieldCache calculatedFieldCache) {
 | 
			
		||||
        super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
 | 
			
		||||
                eventPublisher, jwtSettingsService);
 | 
			
		||||
        this.stateService = stateService;
 | 
			
		||||
        this.localSubscriptionService = localSubscriptionService;
 | 
			
		||||
@ -412,6 +416,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
 | 
			
		||||
            callback.onSuccess();
 | 
			
		||||
        } else if (toCoreNotification.hasResourceCacheInvalidateMsg()) {
 | 
			
		||||
            forwardToResourceService(toCoreNotification.getResourceCacheInvalidateMsg(), callback);
 | 
			
		||||
        } else if (toCoreNotification.hasEntityProfileUpdateMsg()) {
 | 
			
		||||
            processEntityProfileUpdateMsg(toCoreNotification.getEntityProfileUpdateMsg());
 | 
			
		||||
        } else if (toCoreNotification.hasProfileEntityMsg()) {
 | 
			
		||||
            processProfileEntityMsg(toCoreNotification.getProfileEntityMsg());
 | 
			
		||||
        }
 | 
			
		||||
        if (statsEnabled) {
 | 
			
		||||
            stats.log(toCoreNotification);
 | 
			
		||||
@ -530,6 +538,28 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
 | 
			
		||||
        callback.onSuccess();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processEntityProfileUpdateMsg(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg) {
 | 
			
		||||
        var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB());
 | 
			
		||||
        var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB()));
 | 
			
		||||
        var oldProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getOldProfileIdMSB(), profileUpdateMsg.getOldProfileIdLSB()));
 | 
			
		||||
        var newProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getNewProfileIdMSB(), profileUpdateMsg.getNewProfileIdLSB()));
 | 
			
		||||
        calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId);
 | 
			
		||||
        calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void processProfileEntityMsg(TransportProtos.ProfileEntityMsgProto profileEntityMsg) {
 | 
			
		||||
        var tenantId = toTenantId(profileEntityMsg.getTenantIdMSB(), profileEntityMsg.getTenantIdLSB());
 | 
			
		||||
        var entityId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityType(), new UUID(profileEntityMsg.getEntityIdMSB(), profileEntityMsg.getEntityIdLSB()));
 | 
			
		||||
        var profileId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityProfileType(), new UUID(profileEntityMsg.getProfileIdMSB(), profileEntityMsg.getProfileIdLSB()));
 | 
			
		||||
        boolean added = profileEntityMsg.getAdded();
 | 
			
		||||
        Set<EntityId> entitiesByProfile = calculatedFieldCache.getEntitiesByProfile(tenantId, profileId);
 | 
			
		||||
        if (added) {
 | 
			
		||||
            entitiesByProfile.add(entityId);
 | 
			
		||||
        } else {
 | 
			
		||||
            entitiesByProfile.remove(entityId);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback callback) {
 | 
			
		||||
        if (msg.hasSubEvent()) {
 | 
			
		||||
            TbEntitySubEventProto subEvent = msg.getSubEvent();
 | 
			
		||||
@ -688,12 +718,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
 | 
			
		||||
 | 
			
		||||
    private void forwardToCalculatedFieldService(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg, TbCallback callback) {
 | 
			
		||||
        var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB());
 | 
			
		||||
        var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB()));
 | 
			
		||||
        var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB()));
 | 
			
		||||
        ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityProfileChangedMsg(profileUpdateMsg, callback));
 | 
			
		||||
        DonAsynchron.withCallback(future,
 | 
			
		||||
                __ -> callback.onSuccess(),
 | 
			
		||||
                t -> {
 | 
			
		||||
                    log.warn("[{}] Failed to process device type updated message for device [{}]", tenantId.getId(), entityId.getId(), t);
 | 
			
		||||
                    log.warn("[{}] Failed to process entity profile updated message for entity [{}]", tenantId.getId(), entityId.getId(), t);
 | 
			
		||||
                    callback.onFailure(t);
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -91,7 +91,7 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdge
 | 
			
		||||
 | 
			
		||||
    public DefaultTbEdgeConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,
 | 
			
		||||
                                        StatsFactory statsFactory, EdgeContextComponent edgeCtx) {
 | 
			
		||||
        super(actorContext, null, null, null, null, null,
 | 
			
		||||
        super(actorContext, null, null, null, null, null, null,
 | 
			
		||||
                null, null);
 | 
			
		||||
        this.edgeCtx = edgeCtx;
 | 
			
		||||
        this.stats = new EdgeConsumerStats(statsFactory);
 | 
			
		||||
@ -270,8 +270,10 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdge
 | 
			
		||||
                case TENANT_PROFILE -> future = edgeCtx.getTenantProfileProcessor().processEntityNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                case NOTIFICATION_RULE, NOTIFICATION_TARGET, NOTIFICATION_TEMPLATE ->
 | 
			
		||||
                        future = edgeCtx.getNotificationEdgeProcessor().processEntityNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                case TB_RESOURCE -> future = edgeCtx.getResourceProcessor().processEntityNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                case DOMAIN, OAUTH2_CLIENT -> future = edgeCtx.getOAuth2EdgeProcessor().processEntityNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                case TB_RESOURCE ->
 | 
			
		||||
                        future = edgeCtx.getResourceProcessor().processEntityNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                case DOMAIN, OAUTH2_CLIENT ->
 | 
			
		||||
                        future = edgeCtx.getOAuth2EdgeProcessor().processEntityNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                default -> {
 | 
			
		||||
                    future = Futures.immediateFuture(null);
 | 
			
		||||
                    log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type);
 | 
			
		||||
 | 
			
		||||
@ -46,6 +46,7 @@ import org.thingsboard.server.queue.discovery.QueueKey;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldCache;
 | 
			
		||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
 | 
			
		||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
 | 
			
		||||
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
 | 
			
		||||
@ -83,8 +84,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
 | 
			
		||||
                                              TbApiUsageStateService apiUsageStateService,
 | 
			
		||||
                                              PartitionService partitionService,
 | 
			
		||||
                                              ApplicationEventPublisher eventPublisher,
 | 
			
		||||
                                              JwtSettingsService jwtSettingsService) {
 | 
			
		||||
        super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService);
 | 
			
		||||
                                              JwtSettingsService jwtSettingsService,
 | 
			
		||||
                                              CalculatedFieldCache calculatedFieldCache) {
 | 
			
		||||
        super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService);
 | 
			
		||||
        this.ctx = ctx;
 | 
			
		||||
        this.tbDeviceRpcService = tbDeviceRpcService;
 | 
			
		||||
        this.queueService = queueService;
 | 
			
		||||
 | 
			
		||||
@ -25,6 +25,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AssetId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AssetProfileId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.CustomerId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
 | 
			
		||||
@ -43,6 +44,7 @@ import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
 | 
			
		||||
import org.thingsboard.server.queue.util.AfterStartUp;
 | 
			
		||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
 | 
			
		||||
import org.thingsboard.server.service.cf.CalculatedFieldCache;
 | 
			
		||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
 | 
			
		||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
 | 
			
		||||
import org.thingsboard.server.service.queue.TbPackCallback;
 | 
			
		||||
@ -68,6 +70,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
 | 
			
		||||
    protected final TbTenantProfileCache tenantProfileCache;
 | 
			
		||||
    protected final TbDeviceProfileCache deviceProfileCache;
 | 
			
		||||
    protected final TbAssetProfileCache assetProfileCache;
 | 
			
		||||
    protected final CalculatedFieldCache calculatedFieldCache;
 | 
			
		||||
    protected final TbApiUsageStateService apiUsageStateService;
 | 
			
		||||
    protected final PartitionService partitionService;
 | 
			
		||||
    protected final ApplicationEventPublisher eventPublisher;
 | 
			
		||||
@ -189,6 +192,12 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
 | 
			
		||||
            if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) {
 | 
			
		||||
                apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId());
 | 
			
		||||
            }
 | 
			
		||||
        } else if (EntityType.CALCULATED_FIELD.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
 | 
			
		||||
            if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.CREATED) {
 | 
			
		||||
                calculatedFieldCache.updateCalculatedFieldLinks(componentLifecycleMsg.getTenantId(), (CalculatedFieldId) componentLifecycleMsg.getEntityId());
 | 
			
		||||
            } else {
 | 
			
		||||
                calculatedFieldCache.evict((CalculatedFieldId) componentLifecycleMsg.getEntityId());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        eventPublisher.publishEvent(componentLifecycleMsg);
 | 
			
		||||
 | 
			
		||||
@ -1620,6 +1620,8 @@ message ToCoreNotificationMsg {
 | 
			
		||||
  FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12 [deprecated = true];
 | 
			
		||||
  ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13;
 | 
			
		||||
  RestApiCallResponseMsgProto restApiCallResponseMsg = 50;
 | 
			
		||||
  EntityProfileUpdateMsgProto entityProfileUpdateMsg = 51;
 | 
			
		||||
  ProfileEntityMsgProto profileEntityMsg = 52;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* Messages to Edge queue that are handled by ThingsBoard Core Service */
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										45
									
								
								docker/docker-compose.cluster.yml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										45
									
								
								docker/docker-compose.cluster.yml
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,45 @@
 | 
			
		||||
#
 | 
			
		||||
# Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
#
 | 
			
		||||
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
# you may not use this file except in compliance with the License.
 | 
			
		||||
# You may obtain a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
# Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
# See the License for the specific language governing permissions and
 | 
			
		||||
# limitations under the License.
 | 
			
		||||
#
 | 
			
		||||
 | 
			
		||||
version: '3.0'
 | 
			
		||||
 | 
			
		||||
services:
 | 
			
		||||
  kafka:
 | 
			
		||||
    restart: always
 | 
			
		||||
    image: "bitnami/kafka:3.7.0"
 | 
			
		||||
    ports:
 | 
			
		||||
      - "9092:9092"
 | 
			
		||||
    env_file:
 | 
			
		||||
      - kafka.env
 | 
			
		||||
    depends_on:
 | 
			
		||||
      - zookeeper
 | 
			
		||||
  zookeeper:
 | 
			
		||||
    restart: always
 | 
			
		||||
    image: "zookeeper:3.8.0"
 | 
			
		||||
    ports:
 | 
			
		||||
      - "2181"
 | 
			
		||||
    environment:
 | 
			
		||||
      ZOO_MY_ID: 1
 | 
			
		||||
      ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181
 | 
			
		||||
      ZOO_ADMINSERVER_ENABLED: "false"
 | 
			
		||||
  redis:
 | 
			
		||||
    restart: always
 | 
			
		||||
    image: bitnami/redis:7.2
 | 
			
		||||
    environment:
 | 
			
		||||
      # ALLOW_EMPTY_PASSWORD is recommended only for development.
 | 
			
		||||
      ALLOW_EMPTY_PASSWORD: "yes"
 | 
			
		||||
    ports:
 | 
			
		||||
      - '6379:6379'
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user