diff --git a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java index 4bf243034b..efe25e3883 100644 --- a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java +++ b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java @@ -16,6 +16,7 @@ package org.thingsboard.server.controller; import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Schema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; @@ -24,17 +25,26 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.config.annotations.ApiOperation; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.cf.TbCalculatedFieldService; import org.thingsboard.server.service.security.permission.Operation; +import static org.thingsboard.server.controller.ControllerConstants.CF_TEXT_SEARCH_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_DATA_PARAMETERS; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH; import static org.thingsboard.server.controller.ControllerConstants.UUID_WIKI_LINK; @@ -81,6 +91,26 @@ public class CalculatedFieldController extends BaseController { return calculatedField; } + @ApiOperation(value = "Get Calculated Fields (getCalculatedFields)", + notes = "Returns a page of calculated fields. " + PAGE_DATA_PARAMETERS + ) + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/calculatedFields", params = {"pageSize", "page"}, method = RequestMethod.GET) + @ResponseBody + public PageData getCalculatedFields( + @Parameter(description = PAGE_SIZE_DESCRIPTION, required = true) + @RequestParam int pageSize, + @Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true) + @RequestParam int page, + @Parameter(description = CF_TEXT_SEARCH_DESCRIPTION) + @RequestParam(required = false) String textSearch, + @Parameter(description = SORT_PROPERTY_DESCRIPTION, schema = @Schema(allowableValues = {"createdTime", "name"})) + @RequestParam(required = false) String sortProperty, + @Parameter(description = SORT_ORDER_DESCRIPTION, schema = @Schema(allowableValues = {"ASC", "DESC"})) + @RequestParam(required = false) String sortOrder) throws ThingsboardException { + PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); + return checkNotNull(calculatedFieldService.findAllCalculatedFields(pageLink)); + } @ApiOperation(value = "Delete Calculated Field (deleteCalculatedField)", notes = "Deletes the calculated field. Referencing non-existing Calculated Field Id will cause an error." + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index c7a59cfd83..29625941bd 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -96,6 +96,7 @@ public class ControllerConstants { protected static final String EDGE_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the edge name."; protected static final String EVENT_TEXT_SEARCH_DESCRIPTION = "The value is not used in searching."; protected static final String AUDIT_LOG_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on one of the next properties: entityType, entityName, userName, actionType, actionStatus."; + protected static final String CF_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the calculated field name."; protected static final String SORT_PROPERTY_DESCRIPTION = "Property of entity to sort by"; protected static final String SORT_ORDER_DESCRIPTION = "Sort order. ASC (ASCENDING) or DESC (DESCENDING)"; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java index 1ee1d4d562..ff3bda5da5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java @@ -41,10 +41,6 @@ public interface CalculatedFieldCache { Set getEntitiesByProfile(TenantId tenantId, EntityId entityId); - void evictProfile(TenantId tenantId, EntityId entityId); - - void evictEntity(TenantId tenantId, EntityId entityId); - void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index d38c1c58ba..806c224608 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -39,7 +40,7 @@ public interface CalculatedFieldExecutionService { /** * Filter CFs based on the request entity. Push to the queue if any matching CF exist; * - * @param request - telemetry save request; + * @param request - telemetry save request; * @param callback */ void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); @@ -48,6 +49,8 @@ public interface CalculatedFieldExecutionService { void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); + void pushCalculatedFieldLifecycleMsgToQueue(CalculatedField calculatedField, ComponentLifecycleMsgProto proto); + ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); // void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index b0c9f6adde..7e841a0cf8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -22,22 +22,15 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.script.api.tbel.TbelInvokeService; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; -import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; -import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.device.DeviceService; @@ -179,33 +172,6 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { return entities; } - @Override - public void evictProfile(TenantId tenantId, EntityId entityId) { - log.debug("[{}] evict entity profile from cache.", entityId); - profileEntities.remove(entityId); - } - - @Override - public void evictEntity(TenantId tenantId, EntityId entityId) { - calculatedFieldFetchLock.lock(); - try { - profileEntities.forEach((profile, entityIds) -> entityIds.remove(entityId)); - if (EntityType.ASSET.equals(entityId.getEntityType())) { - Asset asset = assetService.findAssetById(tenantId, (AssetId) entityId); - if (asset != null) { - profileEntities.computeIfAbsent(asset.getAssetProfileId(), profileId -> new HashSet<>()).add(entityId); - } - } else { - Device device = deviceService.findDeviceById(tenantId, (DeviceId) entityId); - if (device != null) { - profileEntities.computeIfAbsent(device.getDeviceProfileId(), profileId -> new HashSet<>()).add(entityId); - } - } - } finally { - calculatedFieldFetchLock.unlock(); - } - } - @Override public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { calculatedFieldFetchLock.lock(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 3e37366da3..f333eecbd8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -118,6 +118,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.SCOPE; +import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto; +import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY; @Service @Slf4j @@ -325,6 +327,26 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId)); } + @Override + public void pushCalculatedFieldLifecycleMsgToQueue(CalculatedField calculatedField, ComponentLifecycleMsgProto proto) { + EntityId entityId = calculatedField.getEntityId(); + ToCalculatedFieldMsg msg = ToCalculatedFieldMsg.newBuilder().setComponentLifecycleMsg(proto).build(); + switch (entityId.getEntityType()) { + case ASSET, DEVICE -> { + TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); + clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, null); + } + case ASSET_PROFILE, DEVICE_PROFILE -> { + Set tpiSet = calculatedFieldCache.getEntitiesByProfile(calculatedField.getTenantId(), entityId).stream() + .map(targetEntityId -> partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, targetEntityId)) + .collect(Collectors.toSet()); + tpiSet.forEach(tpi -> clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, null)); + } + default -> throw new IllegalArgumentException("Entity type '" + calculatedField.getId().getEntityType() + + "' does not support calculated fields."); + } + } + @Override public void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto proto, TbCallback callback) { try { @@ -505,7 +527,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas CalculatedFieldTelemetryUpdateRequest request = fromProto(proto.getMsg()); if (proto.getLinksList().isEmpty()) { - onTelemetryUpdate(proto, callback); + onTelemetryUpdate(proto.getMsg(), callback); return; } @@ -803,7 +825,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas List versions = result.getVersions(); for (int i = 0; i < entries.size(); i++) { long tsVersion = versions.get(i); - TsKvProto tsProto = ProtoUtils.toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); + TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); telemetryMsg.addTsData(tsProto); } msg.setTelemetryMsg(telemetryMsg.build()); @@ -837,7 +859,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits()); telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - if(calculatedFieldIds != null) { + if (calculatedFieldIds != null) { for (CalculatedFieldId cfId : calculatedFieldIds) { telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index 4703ed1606..95b340c362 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.DeviceId; @@ -87,7 +88,7 @@ public class EntityStateSourcingListener { case ASSET -> { onAssetUpdate(event.getEntity(), event.getOldEntity()); } - case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE, CALCULATED_FIELD -> { + case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE -> { tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, lifecycleEvent); } case RULE_CHAIN -> { @@ -122,6 +123,9 @@ public class EntityStateSourcingListener { ApiUsageState apiUsageState = (ApiUsageState) event.getEntity(); tbClusterService.onApiStateChange(apiUsageState, null); } + case CALCULATED_FIELD -> { + onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity(), lifecycleEvent); + } default -> { } } @@ -146,7 +150,7 @@ public class EntityStateSourcingListener { Asset asset = (Asset) event.getEntity(); tbClusterService.onAssetDeleted(tenantId, asset, null); } - case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE, CALCULATED_FIELD -> { + case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> { tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED); } case NOTIFICATION_REQUEST -> { @@ -187,6 +191,11 @@ public class EntityStateSourcingListener { TbResourceInfo tbResource = (TbResourceInfo) event.getEntity(); tbClusterService.onResourceDeleted(tbResource, null); } + case CALCULATED_FIELD -> { + CalculatedField calculatedField = (CalculatedField) event.getEntity(); + ComponentLifecycleMsg lifecycleMsg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED); + tbClusterService.onCalculatedFieldDeleted(calculatedField.getTenantId(), calculatedField, lifecycleMsg); + } default -> { } } @@ -267,6 +276,15 @@ public class EntityStateSourcingListener { } } + private void onCalculatedFieldUpdate(Object entity, Object oldEntity, ComponentLifecycleEvent lifecycleEvent) { + CalculatedField calculatedField = (CalculatedField) entity; + CalculatedField oldCalculatedField = null; + if (oldEntity instanceof CalculatedField) { + oldCalculatedField = (CalculatedField) oldEntity; + } + tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField, new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), lifecycleEvent)); + } + private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) { String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice)); if (data != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index fb1d6b19ea..0ccecbbeca 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -36,10 +36,12 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.queue.TbQueueConsumer; @@ -163,8 +165,15 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer log.trace("[{}] Forwarding regular telemetry message for processing {}", id, toCfMsg.getTelemetryMsg()); forwardToActorSystem(toCfMsg.getTelemetryMsg(), callback); } else if (toCfMsg.hasLinkedTelemetryMsg()) { - log.trace("[{}] Forwarding linked telemetry message for processing {}", id, toCfMsg.getLinkedTelemetryMsg()); forwardToActorSystem(toCfMsg.getLinkedTelemetryMsg(), callback); + } else if (toCfMsg.hasComponentLifecycleMsg()) { + log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfMsg.getComponentLifecycleMsg()); + /// TODO: forward to Actor system + forwardToCalculatedFieldService(toCfMsg.getComponentLifecycleMsg(), callback); + } else if (toCfMsg.hasEntityUpdateMsg()) { + log.trace("[{}] Forwarding entity update message for processing {}", id, toCfMsg.getEntityUpdateMsg()); + /// TODO: forward to Actor system + forwardToCalculatedFieldService(toCfMsg.getEntityUpdateMsg(), callback); } } catch (Throwable e) { log.warn("[{}] Failed to process message: {}", id, msg, e); @@ -214,36 +223,17 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer protected void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) { ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue(); if (toCfNotification.hasComponentLifecycle()) { + // from upstream (maybe removed since we dont need to init state for each partition) forwardToActorSystem(toCfNotification.getComponentLifecycle(), callback); + handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycle())); } else if (toCfNotification.hasEntityUpdateMsg()) { + processEntityUpdateMsg(toCfNotification.getEntityUpdateMsg()); + // from upstream (maybe removed since we dont need to update state for each partition) forwardToActorSystem(toCfNotification.getEntityUpdateMsg(), callback); } callback.onSuccess(); } - // private void processEntityProfileUpdateMsg(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg) { -// var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB()); -// var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB())); -// var oldProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getOldProfileIdMSB(), profileUpdateMsg.getOldProfileIdLSB())); -// var newProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getNewProfileIdMSB(), profileUpdateMsg.getNewProfileIdLSB())); -// calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId); -// calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId); -// } -// -// private void processProfileEntityMsg(TransportProtos.ProfileEntityMsgProto profileEntityMsg) { -// var tenantId = toTenantId(profileEntityMsg.getTenantIdMSB(), profileEntityMsg.getTenantIdLSB()); -// var entityId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityType(), new UUID(profileEntityMsg.getEntityIdMSB(), profileEntityMsg.getEntityIdLSB())); -// var profileId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityProfileType(), new UUID(profileEntityMsg.getProfileIdMSB(), profileEntityMsg.getProfileIdLSB())); -// boolean added = profileEntityMsg.getAdded(); -// Set entitiesByProfile = calculatedFieldCache.getEntitiesByProfile(tenantId, profileId); -// if (added) { -// entitiesByProfile.add(entityId); -// } else { -// entitiesByProfile.remove(entityId); -// } -// } -// - private void forwardToActorSystem(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); @@ -257,7 +247,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer actorContext.tell(new CalculatedFieldLinkedTelemetryMsg(tenantId, entityId, linkedMsg, callback)); } - private void forwardToActorSystem(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) { + private void forwardToCalculatedFieldService(ComponentLifecycleMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldLifecycleMsg(msg, callback)); @@ -269,7 +259,19 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer }); } - private void forwardToActorSystem(TransportProtos.CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) { + private void forwardToActorSystem(ComponentLifecycleMsgProto msg, TbCallback callback) { + var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); + var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); + ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldLifecycleMsg(msg, callback)); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t); + callback.onFailure(t); + }); + } + + private void forwardToCalculatedFieldService(CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityUpdateMsg(msg, callback)); @@ -281,6 +283,35 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer }); } + private void forwardToActorSystem(CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) { + var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); + var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); + ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityUpdateMsg(msg, callback)); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("[{}] Failed to process entity updated message for entity [{}]", tenantId.getId(), entityId.getId(), t); + callback.onFailure(t); + }); + } + + private void processEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto entityUpdateMsg) { + var tenantId = toTenantId(entityUpdateMsg.getTenantIdMSB(), entityUpdateMsg.getTenantIdLSB()); + var entityId = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityType(), new UUID(entityUpdateMsg.getEntityIdMSB(), entityUpdateMsg.getEntityIdLSB())); + if (entityUpdateMsg.getAdded()) { + var newProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getNewProfileIdMSB(), entityUpdateMsg.getNewProfileIdLSB())); + calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId); + } else if (entityUpdateMsg.getDeleted()) { + var oldProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getOldProfileIdMSB(), entityUpdateMsg.getOldProfileIdLSB())); + calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId); + } else if (entityUpdateMsg.getUpdated()) { + var oldProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getOldProfileIdMSB(), entityUpdateMsg.getOldProfileIdLSB())); + var newProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getNewProfileIdMSB(), entityUpdateMsg.getNewProfileIdLSB())); + calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId); + calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId); + } + } + private void throwNotHandled(Object msg, TbCallback callback) { log.warn("Message not handled: {}", msg); callback.onFailure(new RuntimeException("Message not handled!")); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 04d6a53401..b1a4b1859e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -38,10 +38,12 @@ import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; @@ -68,6 +70,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto; @@ -95,6 +98,7 @@ import org.thingsboard.server.queue.common.TbRuleEngineProducerService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; +import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -145,6 +149,10 @@ public class DefaultTbClusterService implements TbClusterService { @Lazy private OtaPackageStateService otaPackageStateService; + @Autowired + @Lazy + private CalculatedFieldExecutionService calculatedFieldExecutionService; + private final TopicService topicService; private final TbDeviceProfileCache deviceProfileCache; private final TbAssetProfileCache assetProfileCache; @@ -342,21 +350,21 @@ public class DefaultTbClusterService implements TbClusterService { public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); - toCoreMsgs.incrementAndGet(); + toRuleEngineMsgs.incrementAndGet(); } @Override public void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { log.trace("PUSHING msg: {} to:{}", msg, tpi); producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); - toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS + toRuleEngineMsgs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS } @Override public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); - toCoreMsgs.incrementAndGet(); + toRuleEngineNfs.incrementAndGet(); } @Override @@ -686,6 +694,20 @@ public class DefaultTbClusterService implements TbClusterService { broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); } + @Override + public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, ComponentLifecycleMsg lifecycleMsg) { + var created = oldCalculatedField == null; + calculatedFieldExecutionService.pushCalculatedFieldLifecycleMsgToQueue(calculatedField, toProto(lifecycleMsg)); + broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + } + + @Override + public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, ComponentLifecycleMsg lifecycleMsg) { + CalculatedFieldId calculatedFieldId = calculatedField.getId(); + calculatedFieldExecutionService.pushCalculatedFieldLifecycleMsgToQueue(calculatedField, toProto(lifecycleMsg)); + broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED); + } + @Override public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId originatorEdgeId) { if (!edgesEnabled) { @@ -827,7 +849,7 @@ public class DefaultTbClusterService implements TbClusterService { } private void handleCalculatedFieldEntityUpdateEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId, boolean added, boolean updated, boolean deleted) { - TransportProtos.CalculatedFieldEntityUpdateMsgProto.Builder builder = TransportProtos.CalculatedFieldEntityUpdateMsgProto.newBuilder(); + CalculatedFieldEntityUpdateMsgProto.Builder builder = CalculatedFieldEntityUpdateMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); builder.setEntityType(entityId.getEntityType().name()); @@ -846,9 +868,22 @@ public class DefaultTbClusterService implements TbClusterService { builder.setAdded(added); builder.setUpdated(updated); builder.setDeleted(deleted); - TransportProtos.CalculatedFieldEntityUpdateMsgProto msg = builder.build(); + CalculatedFieldEntityUpdateMsgProto msg = builder.build(); - pushNotificationToCalculatedFields(tenantId, entityId, ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(msg).build(), null); + broadcastEntityUpdateEvent(msg); + pushMsgToCalculatedFields(tenantId, entityId, ToCalculatedFieldMsg.newBuilder().setEntityUpdateMsg(msg).build(), null); + } + + private void broadcastEntityUpdateEvent(CalculatedFieldEntityUpdateMsgProto proto) { + TbQueueProducer> toCalculatedFieldProducer = producerProvider.getCalculatedFieldsNotificationsMsgProducer(); + Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); + Set tbCalculatedFieldServices = new HashSet<>(tbRuleEngineServices); + for (String serviceId : tbCalculatedFieldServices) { + TopicPartitionInfo tpi = topicService.getCalculatedFieldNotificationsTopic(serviceId); + ToCalculatedFieldNotificationMsg toCfNotificationMsg = ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(proto).build(); + toCalculatedFieldProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), toCfNotificationMsg), null); + toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 5b1e5d7d79..dac35bfc5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -178,16 +178,12 @@ public abstract class AbstractConsumerService> toUsageStats; private TbQueueProducer> toVersionControl; private TbQueueProducer> toHousekeeper; - private TbQueueProducer> toCalculatedFields; + private TbQueueProducer> toCalculatedFields; private TbQueueProducer> toCalculatedFieldNotifications; public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) {