From 0e1cd69e34645ce1acd3401d85d45ef5f6fe43a2 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Mon, 27 Jan 2025 15:55:40 +0200 Subject: [PATCH 1/2] updated cf consumer --- .../service/cf/CalculatedFieldCache.java | 4 - .../cf/CalculatedFieldExecutionService.java | 8 +- .../cf/DefaultCalculatedFieldCache.java | 32 ---- ...efaultCalculatedFieldExecutionService.java | 143 ++++++++++-------- .../entitiy/EntityStateSourcingListener.java | 22 ++- ...faultTbCalculatedFieldConsumerService.java | 63 ++++---- .../queue/DefaultTbClusterService.java | 47 +++++- .../processing/AbstractConsumerService.java | 4 - .../server/cluster/TbClusterService.java | 5 + .../server/common/util/ProtoUtils.java | 13 +- common/proto/src/main/proto/queue.proto | 2 + .../provider/TbCoreQueueProducerProvider.java | 3 +- 12 files changed, 188 insertions(+), 158 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java index 1ee1d4d562..ff3bda5da5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java @@ -41,10 +41,6 @@ public interface CalculatedFieldCache { Set getEntitiesByProfile(TenantId tenantId, EntityId entityId); - void evictProfile(TenantId tenantId, EntityId entityId); - - void evictEntity(TenantId tenantId, EntityId entityId); - void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 62e234943b..7668acba4e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; @@ -30,6 +31,7 @@ public interface CalculatedFieldExecutionService { /** * Filter CFs based on the request entity. Push to the queue if any matching CF exist; + * * @param request - telemetry save request; * @param request - telemetry save result; */ @@ -37,6 +39,8 @@ public interface CalculatedFieldExecutionService { void pushRequestToQueue(AttributesSaveRequest request, List result); + void pushCalculatedFieldLifecycleMsgToQueue(CalculatedField calculatedField, ComponentLifecycleMsgProto proto); + void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback); void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback); @@ -47,10 +51,6 @@ public interface CalculatedFieldExecutionService { void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto proto, TbCallback callback); - void onTelemetryUpdate(CalculatedFieldTelemetryMsgProto proto, TbCallback callback); - - void onTelemetryUpdate(CalculatedFieldLinkedTelemetryMsgProto proto, TbCallback callback); - void onEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 4278e74845..7e841a0cf8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -22,16 +22,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.script.api.tbel.TbelInvokeService; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; -import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -177,33 +172,6 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { return entities; } - @Override - public void evictProfile(TenantId tenantId, EntityId entityId) { - log.debug("[{}] evict entity profile from cache.", entityId); - profileEntities.remove(entityId); - } - - @Override - public void evictEntity(TenantId tenantId, EntityId entityId) { - calculatedFieldFetchLock.lock(); - try { - profileEntities.forEach((profile, entityIds) -> entityIds.remove(entityId)); - if (EntityType.ASSET.equals(entityId.getEntityType())) { - Asset asset = assetService.findAssetById(tenantId, (AssetId) entityId); - if (asset != null) { - profileEntities.computeIfAbsent(asset.getAssetProfileId(), profileId -> new HashSet<>()).add(entityId); - } - } else { - Device device = deviceService.findDeviceById(tenantId, (DeviceId) entityId); - if (device != null) { - profileEntities.computeIfAbsent(device.getDeviceProfileId(), profileId -> new HashSet<>()).add(entityId); - } - } - } finally { - calculatedFieldFetchLock.unlock(); - } - } - @Override public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { calculatedFieldFetchLock.lock(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index f92b9819ed..2a4d42ddea 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -66,7 +66,6 @@ import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.util.ProtoUtils; @@ -120,6 +119,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.SCOPE; +import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto; +import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY; @Service @Slf4j @@ -242,14 +243,59 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas @Override public void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { + try { + CalculatedFieldTelemetryUpdateRequest request = fromProto(msg); + EntityId entityId = request.getEntityId(); - callback.onSuccess(); + if (supportedReferencedEntities.contains(entityId.getEntityType())) { + TenantId tenantId = request.getTenantId(); + TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); + + if (tpi.isMyPartition()) { + + processCalculatedFields(request, entityId); + processCalculatedFields(request, getProfileId(tenantId, entityId)); + + Map> tpiStatesToUpdate = new HashMap<>(); + processCalculatedFieldLinks(request, tpiStatesToUpdate); + if (!tpiStatesToUpdate.isEmpty()) { + tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> { + CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto(msg, ctxIds); + clusterService.pushMsgToCalculatedFields(topicPartitionInfo, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setLinkedTelemetryMsg(linkedTelemetryMsgProto).build(), null); + }); + } + } else { + clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(msg).build(), null); + } + } + } catch (Exception e) { + log.trace("Failed to update telemetry.", e); + } } @Override public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) { + try { + CalculatedFieldTelemetryUpdateRequest request = fromProto(linkedMsg.getMsg()); - callback.onSuccess(); + if (linkedMsg.getLinksList().isEmpty()) { + onTelemetryMsg(linkedMsg.getMsg(), callback); + return; + } + + linkedMsg.getLinksList().forEach(ctxIdProto -> { + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); + CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId); + + Map updatedTelemetry = request.getMappedTelemetry(ctx, request.getEntityId()); + if (!updatedTelemetry.isEmpty()) { + EntityId targetEntityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); + executeTelemetryUpdate(ctx, targetEntityId, request.getPreviousCalculatedFieldIds(), updatedTelemetry); + } + }); + } catch (Exception e) { + log.trace("Failed to process telemetry update msg: [{}]", linkedMsg, e); + } } @Override @@ -263,7 +309,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Consumer resolvePartition = entityId -> { TopicPartitionInfo tpi; try { - tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, cf.getTenantId(), entityId); + tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId()))) { tpiTargetEntityMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(new CalculatedFieldEntityCtxId(cf.getId(), entityId)); } @@ -378,6 +424,26 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + @Override + public void pushCalculatedFieldLifecycleMsgToQueue(CalculatedField calculatedField, ComponentLifecycleMsgProto proto) { + EntityId entityId = calculatedField.getEntityId(); + ToCalculatedFieldMsg msg = ToCalculatedFieldMsg.newBuilder().setComponentLifecycleMsg(proto).build(); + switch (entityId.getEntityType()) { + case ASSET, DEVICE -> { + TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); + clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, null); + } + case ASSET_PROFILE, DEVICE_PROFILE -> { + Set tpiSet = calculatedFieldCache.getEntitiesByProfile(calculatedField.getTenantId(), entityId).stream() + .map(targetEntityId -> partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, targetEntityId)) + .collect(Collectors.toSet()); + tpiSet.forEach(tpi -> clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, null)); + } + default -> throw new IllegalArgumentException("Entity type '" + calculatedField.getId().getEntityType() + + "' does not support calculated fields."); + } + } + private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) { CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getId()); boolean shouldReinit = true; @@ -418,38 +484,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return entityIdChanged || typeChanged || argumentsChanged; } - @Override - public void onTelemetryUpdate(CalculatedFieldTelemetryMsgProto proto, TbCallback callback) { - try { - CalculatedFieldTelemetryUpdateRequest request = fromProto(proto); - EntityId entityId = request.getEntityId(); - - if (supportedReferencedEntities.contains(entityId.getEntityType())) { - TenantId tenantId = request.getTenantId(); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); - - if (tpi.isMyPartition()) { - - processCalculatedFields(request, entityId); - processCalculatedFields(request, getProfileId(tenantId, entityId)); - - Map> tpiStatesToUpdate = new HashMap<>(); - processCalculatedFieldLinks(request, tpiStatesToUpdate); - if (!tpiStatesToUpdate.isEmpty()) { - tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> { - CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto(proto, ctxIds); - clusterService.pushMsgToCalculatedFields(topicPartitionInfo, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setLinkedTelemetryMsg(linkedTelemetryMsgProto).build(), null); - }); - } - } else { - clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(proto).build(), null); - } - } - } catch (Exception e) { - log.trace("Failed to update telemetry.", e); - } - } - private void processCalculatedFields(CalculatedFieldTelemetryUpdateRequest request, EntityId cfTargetEntityId) { if (cfTargetEntityId != null) { calculatedFieldCache.getCalculatedFieldCtxsByEntityId(cfTargetEntityId).forEach(ctx -> { @@ -483,7 +517,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } private void processCalculatedFieldLink(CalculatedFieldTelemetryUpdateRequest request, EntityId targetEntity, CalculatedFieldCtx ctx, Map> tpiStates) { - TopicPartitionInfo targetEntityTpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, request.getTenantId(), targetEntity); + TopicPartitionInfo targetEntityTpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, targetEntity); if (targetEntityTpi.isMyPartition()) { Map updatedTelemetry = request.getMappedTelemetry(ctx, request.getEntityId()); if (!updatedTelemetry.isEmpty()) { @@ -495,31 +529,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - @Override - public void onTelemetryUpdate(CalculatedFieldLinkedTelemetryMsgProto proto, TbCallback callback) { - try { - CalculatedFieldTelemetryUpdateRequest request = fromProto(proto.getMsg()); - - if (proto.getLinksList().isEmpty()) { - onTelemetryUpdate(proto, callback); - return; - } - - proto.getLinksList().forEach(ctxIdProto -> { - CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); - CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId); - - Map updatedTelemetry = request.getMappedTelemetry(ctx, request.getEntityId()); - if (!updatedTelemetry.isEmpty()) { - EntityId targetEntityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); - executeTelemetryUpdate(ctx, targetEntityId, request.getPreviousCalculatedFieldIds(), updatedTelemetry); - } - }); - } catch (Exception e) { - log.trace("Failed to process telemetry update msg: [{}]", proto, e); - } - } - private void executeTelemetryUpdate(CalculatedFieldCtx cfCtx, EntityId entityId, List previousCalculatedFieldIds, Map updatedTelemetry) { log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", cfCtx.getTenantId(), entityId, cfCtx.getCfId()); Map argumentValues = updatedTelemetry.entrySet().stream() @@ -534,7 +543,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); + TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); if (tpi.isMyPartition()) { log.info("Received CalculatedFieldEntityUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); if (proto.getDeleted()) { @@ -824,7 +833,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas List versions = result.getVersions(); for (int i = 0; i < entries.size(); i++) { long tsVersion = versions.get(i); - TsKvProto tsProto = ProtoUtils.toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); + TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); telemetryMsg.addTsData(tsProto); } msg.setTelemetryMsg(telemetryMsg.build()); @@ -858,8 +867,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits()); telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - for (CalculatedFieldId cfId : calculatedFieldIds) { - telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); + if (calculatedFieldIds != null) { + for (CalculatedFieldId cfId : calculatedFieldIds) { + telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); + } } return telemetryMsg; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index 4703ed1606..95b340c362 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.DeviceId; @@ -87,7 +88,7 @@ public class EntityStateSourcingListener { case ASSET -> { onAssetUpdate(event.getEntity(), event.getOldEntity()); } - case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE, CALCULATED_FIELD -> { + case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE -> { tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, lifecycleEvent); } case RULE_CHAIN -> { @@ -122,6 +123,9 @@ public class EntityStateSourcingListener { ApiUsageState apiUsageState = (ApiUsageState) event.getEntity(); tbClusterService.onApiStateChange(apiUsageState, null); } + case CALCULATED_FIELD -> { + onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity(), lifecycleEvent); + } default -> { } } @@ -146,7 +150,7 @@ public class EntityStateSourcingListener { Asset asset = (Asset) event.getEntity(); tbClusterService.onAssetDeleted(tenantId, asset, null); } - case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE, CALCULATED_FIELD -> { + case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> { tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED); } case NOTIFICATION_REQUEST -> { @@ -187,6 +191,11 @@ public class EntityStateSourcingListener { TbResourceInfo tbResource = (TbResourceInfo) event.getEntity(); tbClusterService.onResourceDeleted(tbResource, null); } + case CALCULATED_FIELD -> { + CalculatedField calculatedField = (CalculatedField) event.getEntity(); + ComponentLifecycleMsg lifecycleMsg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED); + tbClusterService.onCalculatedFieldDeleted(calculatedField.getTenantId(), calculatedField, lifecycleMsg); + } default -> { } } @@ -267,6 +276,15 @@ public class EntityStateSourcingListener { } } + private void onCalculatedFieldUpdate(Object entity, Object oldEntity, ComponentLifecycleEvent lifecycleEvent) { + CalculatedField calculatedField = (CalculatedField) entity; + CalculatedField oldCalculatedField = null; + if (oldEntity instanceof CalculatedField) { + oldCalculatedField = (CalculatedField) oldEntity; + } + tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField, new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), lifecycleEvent)); + } + private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) { String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice)); if (data != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index e550a1154e..bd40784e8c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -21,8 +21,6 @@ import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; -import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; @@ -34,19 +32,16 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; -import org.thingsboard.server.common.msg.MsgType; -import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -170,6 +165,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer } else if (toCfMsg.hasLinkedTelemetryMsg()) { log.trace("[{}] Forwarding linked telemetry message for processing {}", id, toCfMsg.getLinkedTelemetryMsg()); forwardToCalculatedFieldService(toCfMsg.getLinkedTelemetryMsg(), callback); + } else if (toCfMsg.hasComponentLifecycleMsg()) { + log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfMsg.getComponentLifecycleMsg()); + forwardToCalculatedFieldService(toCfMsg.getComponentLifecycleMsg(), callback); + } else if (toCfMsg.hasEntityUpdateMsg()) { + log.trace("[{}] Forwarding entity update message for processing {}", id, toCfMsg.getEntityUpdateMsg()); + forwardToCalculatedFieldService(toCfMsg.getEntityUpdateMsg(), callback); } } catch (Throwable e) { log.warn("[{}] Failed to process message: {}", id, msg, e); @@ -219,36 +220,13 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer protected void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) { ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue(); if (toCfNotification.hasComponentLifecycle()) { - forwardToCalculatedFieldService(toCfNotification.getComponentLifecycle(), callback); + handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycle())); } else if (toCfNotification.hasEntityUpdateMsg()) { - forwardToCalculatedFieldService(toCfNotification.getEntityUpdateMsg(), callback); + processEntityUpdateMsg(toCfNotification.getEntityUpdateMsg()); } callback.onSuccess(); } - // private void processEntityProfileUpdateMsg(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg) { -// var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB()); -// var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB())); -// var oldProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getOldProfileIdMSB(), profileUpdateMsg.getOldProfileIdLSB())); -// var newProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getNewProfileIdMSB(), profileUpdateMsg.getNewProfileIdLSB())); -// calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId); -// calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId); -// } -// -// private void processProfileEntityMsg(TransportProtos.ProfileEntityMsgProto profileEntityMsg) { -// var tenantId = toTenantId(profileEntityMsg.getTenantIdMSB(), profileEntityMsg.getTenantIdLSB()); -// var entityId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityType(), new UUID(profileEntityMsg.getEntityIdMSB(), profileEntityMsg.getEntityIdLSB())); -// var profileId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityProfileType(), new UUID(profileEntityMsg.getProfileIdMSB(), profileEntityMsg.getProfileIdLSB())); -// boolean added = profileEntityMsg.getAdded(); -// Set entitiesByProfile = calculatedFieldCache.getEntitiesByProfile(tenantId, profileId); -// if (added) { -// entitiesByProfile.add(entityId); -// } else { -// entitiesByProfile.remove(entityId); -// } -// } -// - private void forwardToCalculatedFieldService(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) { var msg = linkedMsg.getMsg(); var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); @@ -275,7 +253,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer }); } - private void forwardToCalculatedFieldService(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) { + private void forwardToCalculatedFieldService(ComponentLifecycleMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldLifecycleMsg(msg, callback)); @@ -287,7 +265,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer }); } - private void forwardToCalculatedFieldService(TransportProtos.CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) { + private void forwardToCalculatedFieldService(CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityUpdateMsg(msg, callback)); @@ -299,6 +277,23 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer }); } + private void processEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto entityUpdateMsg) { + var tenantId = toTenantId(entityUpdateMsg.getTenantIdMSB(), entityUpdateMsg.getTenantIdLSB()); + var entityId = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityType(), new UUID(entityUpdateMsg.getEntityIdMSB(), entityUpdateMsg.getEntityIdLSB())); + if (entityUpdateMsg.getAdded()) { + var newProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getNewProfileIdMSB(), entityUpdateMsg.getNewProfileIdLSB())); + calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId); + } else if (entityUpdateMsg.getDeleted()) { + var oldProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getOldProfileIdMSB(), entityUpdateMsg.getOldProfileIdLSB())); + calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId); + } else if (entityUpdateMsg.getUpdated()) { + var oldProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getOldProfileIdMSB(), entityUpdateMsg.getOldProfileIdLSB())); + var newProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getNewProfileIdMSB(), entityUpdateMsg.getNewProfileIdLSB())); + calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId); + calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId); + } + } + private void throwNotHandled(Object msg, TbCallback callback) { log.warn("Message not handled: {}", msg); callback.onFailure(new RuntimeException("Message not handled!")); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 04d6a53401..b1a4b1859e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -38,10 +38,12 @@ import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; @@ -68,6 +70,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto; @@ -95,6 +98,7 @@ import org.thingsboard.server.queue.common.TbRuleEngineProducerService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; +import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -145,6 +149,10 @@ public class DefaultTbClusterService implements TbClusterService { @Lazy private OtaPackageStateService otaPackageStateService; + @Autowired + @Lazy + private CalculatedFieldExecutionService calculatedFieldExecutionService; + private final TopicService topicService; private final TbDeviceProfileCache deviceProfileCache; private final TbAssetProfileCache assetProfileCache; @@ -342,21 +350,21 @@ public class DefaultTbClusterService implements TbClusterService { public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); - toCoreMsgs.incrementAndGet(); + toRuleEngineMsgs.incrementAndGet(); } @Override public void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { log.trace("PUSHING msg: {} to:{}", msg, tpi); producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); - toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS + toRuleEngineMsgs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS } @Override public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); - toCoreMsgs.incrementAndGet(); + toRuleEngineNfs.incrementAndGet(); } @Override @@ -686,6 +694,20 @@ public class DefaultTbClusterService implements TbClusterService { broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); } + @Override + public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, ComponentLifecycleMsg lifecycleMsg) { + var created = oldCalculatedField == null; + calculatedFieldExecutionService.pushCalculatedFieldLifecycleMsgToQueue(calculatedField, toProto(lifecycleMsg)); + broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + } + + @Override + public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, ComponentLifecycleMsg lifecycleMsg) { + CalculatedFieldId calculatedFieldId = calculatedField.getId(); + calculatedFieldExecutionService.pushCalculatedFieldLifecycleMsgToQueue(calculatedField, toProto(lifecycleMsg)); + broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED); + } + @Override public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId originatorEdgeId) { if (!edgesEnabled) { @@ -827,7 +849,7 @@ public class DefaultTbClusterService implements TbClusterService { } private void handleCalculatedFieldEntityUpdateEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId, boolean added, boolean updated, boolean deleted) { - TransportProtos.CalculatedFieldEntityUpdateMsgProto.Builder builder = TransportProtos.CalculatedFieldEntityUpdateMsgProto.newBuilder(); + CalculatedFieldEntityUpdateMsgProto.Builder builder = CalculatedFieldEntityUpdateMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); builder.setEntityType(entityId.getEntityType().name()); @@ -846,9 +868,22 @@ public class DefaultTbClusterService implements TbClusterService { builder.setAdded(added); builder.setUpdated(updated); builder.setDeleted(deleted); - TransportProtos.CalculatedFieldEntityUpdateMsgProto msg = builder.build(); + CalculatedFieldEntityUpdateMsgProto msg = builder.build(); - pushNotificationToCalculatedFields(tenantId, entityId, ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(msg).build(), null); + broadcastEntityUpdateEvent(msg); + pushMsgToCalculatedFields(tenantId, entityId, ToCalculatedFieldMsg.newBuilder().setEntityUpdateMsg(msg).build(), null); + } + + private void broadcastEntityUpdateEvent(CalculatedFieldEntityUpdateMsgProto proto) { + TbQueueProducer> toCalculatedFieldProducer = producerProvider.getCalculatedFieldsNotificationsMsgProducer(); + Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); + Set tbCalculatedFieldServices = new HashSet<>(tbRuleEngineServices); + for (String serviceId : tbCalculatedFieldServices) { + TopicPartitionInfo tpi = topicService.getCalculatedFieldNotificationsTopic(serviceId); + ToCalculatedFieldNotificationMsg toCfNotificationMsg = ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(proto).build(); + toCalculatedFieldProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), toCfNotificationMsg), null); + toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 5b1e5d7d79..dac35bfc5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -178,16 +178,12 @@ public abstract class AbstractConsumerService> toUsageStats; private TbQueueProducer> toVersionControl; private TbQueueProducer> toHousekeeper; - private TbQueueProducer> toCalculatedFields; + private TbQueueProducer> toCalculatedFields; private TbQueueProducer> toCalculatedFieldNotifications; public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) { From d54cb300d42757b929bbe94936e6d90b1f5db874 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 28 Jan 2025 08:43:29 +0200 Subject: [PATCH 2/2] added new endpoint --- .../controller/CalculatedFieldController.java | 30 +++++++++++++++++++ .../controller/ControllerConstants.java | 1 + 2 files changed, 31 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java index 4bf243034b..efe25e3883 100644 --- a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java +++ b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java @@ -16,6 +16,7 @@ package org.thingsboard.server.controller; import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Schema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; @@ -24,17 +25,26 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.config.annotations.ApiOperation; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.cf.TbCalculatedFieldService; import org.thingsboard.server.service.security.permission.Operation; +import static org.thingsboard.server.controller.ControllerConstants.CF_TEXT_SEARCH_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_DATA_PARAMETERS; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH; import static org.thingsboard.server.controller.ControllerConstants.UUID_WIKI_LINK; @@ -81,6 +91,26 @@ public class CalculatedFieldController extends BaseController { return calculatedField; } + @ApiOperation(value = "Get Calculated Fields (getCalculatedFields)", + notes = "Returns a page of calculated fields. " + PAGE_DATA_PARAMETERS + ) + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/calculatedFields", params = {"pageSize", "page"}, method = RequestMethod.GET) + @ResponseBody + public PageData getCalculatedFields( + @Parameter(description = PAGE_SIZE_DESCRIPTION, required = true) + @RequestParam int pageSize, + @Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true) + @RequestParam int page, + @Parameter(description = CF_TEXT_SEARCH_DESCRIPTION) + @RequestParam(required = false) String textSearch, + @Parameter(description = SORT_PROPERTY_DESCRIPTION, schema = @Schema(allowableValues = {"createdTime", "name"})) + @RequestParam(required = false) String sortProperty, + @Parameter(description = SORT_ORDER_DESCRIPTION, schema = @Schema(allowableValues = {"ASC", "DESC"})) + @RequestParam(required = false) String sortOrder) throws ThingsboardException { + PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); + return checkNotNull(calculatedFieldService.findAllCalculatedFields(pageLink)); + } @ApiOperation(value = "Delete Calculated Field (deleteCalculatedField)", notes = "Deletes the calculated field. Referencing non-existing Calculated Field Id will cause an error." + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index c7a59cfd83..29625941bd 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -96,6 +96,7 @@ public class ControllerConstants { protected static final String EDGE_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the edge name."; protected static final String EVENT_TEXT_SEARCH_DESCRIPTION = "The value is not used in searching."; protected static final String AUDIT_LOG_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on one of the next properties: entityType, entityName, userName, actionType, actionStatus."; + protected static final String CF_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the calculated field name."; protected static final String SORT_PROPERTY_DESCRIPTION = "Property of entity to sort by"; protected static final String SORT_ORDER_DESCRIPTION = "Sort order. ASC (ASCENDING) or DESC (DESCENDING)";