From a652b31d7f880b1c13734c26c5f331e971a524e5 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Fri, 24 Jan 2025 15:41:00 +0200 Subject: [PATCH] implementation processing notification --- .../service/cf/CalculatedFieldCache.java | 5 +- .../cf/CalculatedFieldExecutionService.java | 7 +- .../cf/DefaultCalculatedFieldCache.java | 32 ++++ ...efaultCalculatedFieldExecutionService.java | 164 +++++++++++------- .../entitiy/EntityStateSourcingListener.java | 21 +-- .../DefaultSystemDataLoaderService.java | 7 +- ...faultTbCalculatedFieldConsumerService.java | 2 +- .../queue/DefaultTbClusterService.java | 78 ++++----- .../queue/DefaultTbCoreConsumerService.java | 41 ----- .../processing/AbstractConsumerService.java | 4 + .../TbRuleEngineQueueConsumerManager.java | 14 +- .../DefaultTelemetrySubscriptionService.java | 2 - .../server/cluster/TbClusterService.java | 8 +- .../server/common/util/ProtoUtils.java | 15 ++ common/proto/src/main/proto/queue.proto | 4 - .../server/dao/service/EntityServiceTest.java | 5 +- 16 files changed, 213 insertions(+), 196 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java index ea55894432..1ee1d4d562 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldCache.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.cf; -import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.CalculatedFieldId; @@ -42,6 +41,10 @@ public interface CalculatedFieldCache { Set getEntitiesByProfile(TenantId tenantId, EntityId entityId); + void evictProfile(TenantId tenantId, EntityId entityId); + + void evictEntity(TenantId tenantId, EntityId entityId); + void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 93a0ecc75f..836af03e5a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -20,8 +20,9 @@ import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; -import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest; import java.util.List; @@ -42,9 +43,9 @@ public interface CalculatedFieldExecutionService { void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto proto, TbCallback callback); - void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest calculatedFieldTelemetryUpdateRequest); + void onTelemetryUpdate(CalculatedFieldTelemetryMsgProto proto, TbCallback callback); -// void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto); + void onTelemetryUpdate(CalculatedFieldLinkedTelemetryMsgProto proto, TbCallback callback); void onEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 7e841a0cf8..4278e74845 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -22,11 +22,16 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; +import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -172,6 +177,33 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { return entities; } + @Override + public void evictProfile(TenantId tenantId, EntityId entityId) { + log.debug("[{}] evict entity profile from cache.", entityId); + profileEntities.remove(entityId); + } + + @Override + public void evictEntity(TenantId tenantId, EntityId entityId) { + calculatedFieldFetchLock.lock(); + try { + profileEntities.forEach((profile, entityIds) -> entityIds.remove(entityId)); + if (EntityType.ASSET.equals(entityId.getEntityType())) { + Asset asset = assetService.findAssetById(tenantId, (AssetId) entityId); + if (asset != null) { + profileEntities.computeIfAbsent(asset.getAssetProfileId(), profileId -> new HashSet<>()).add(entityId); + } + } else { + Device device = deviceService.findDeviceById(tenantId, (DeviceId) entityId); + if (device != null) { + profileEntities.computeIfAbsent(device.getDeviceProfileId(), profileId -> new HashSet<>()).add(entityId); + } + } + } finally { + calculatedFieldFetchLock.unlock(); + } + } + @Override public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { calculatedFieldFetchLock.lock(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index d46a7515ef..beb347d29f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -36,6 +36,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; @@ -72,14 +73,17 @@ import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; -import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleEvent; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; @@ -92,7 +96,9 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; +import org.thingsboard.server.service.cf.telemetry.CalculatedFieldAttributeUpdateRequest; import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest; +import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTimeSeriesUpdateRequest; import org.thingsboard.server.service.partition.AbstractPartitionBasedService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; @@ -334,7 +340,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas initializeStateForEntity(calculatedFieldCtx, entityId, callback); } case ASSET_PROFILE, DEVICE_PROFILE -> { - log.info("Initializing state for all entities in profile: tenantICalculatedFieldMsgProtod=[{}], profileId=[{}]", tenantId, entityId); + log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId); Map commonArguments = calculatedFieldCtx.getArguments().entrySet().stream() .filter(entry -> entry.getValue().getRefEntityId() != null) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -401,8 +407,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest request) { + public void onTelemetryUpdate(CalculatedFieldTelemetryMsgProto proto, TbCallback callback) { try { + CalculatedFieldTelemetryUpdateRequest request = fromProto(proto); EntityId entityId = request.getEntityId(); if (supportedReferencedEntities.contains(entityId.getEntityType())) { @@ -418,15 +425,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas processCalculatedFieldLinks(request, tpiStatesToUpdate); if (!tpiStatesToUpdate.isEmpty()) { tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> { - TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(request, ctxIds); - clusterService.pushMsgToRuleEngine(topicPartitionInfo, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder() - .setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null); + CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto(proto, ctxIds); + clusterService.pushMsgToCalculatedFields(topicPartitionInfo, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setLinkedTelemetryMsg(linkedTelemetryMsgProto).build(), null); }); } } else { - TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(request); - clusterService.pushMsgToRuleEngine(tpi, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder() - .setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null); + clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(proto).build(), null); } } } catch (Exception e) { @@ -479,30 +483,30 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } -// @Override -// public void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto) { -// try { -// CalculatedFieldTelemetryUpdateRequest request = fromProto(proto); -// -// if (proto.getLinksList().isEmpty()) { -// onTelemetryUpdate(request); -// return; -// } -// -// proto.getLinksList().forEach(ctxIdProto -> { -// CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); -// CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService); -// -// Map updatedTelemetry = request.getMappedTelemetry(ctx, request.getEntityId()); -// if (!updatedTelemetry.isEmpty()) { -// EntityId targetEntityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); -// executeTelemetryUpdate(ctx, targetEntityId, request.getPreviousCalculatedFieldIds(), updatedTelemetry); -// } -// }); -// } catch (Exception e) { -// log.trace("Failed to process telemetry update msg: [{}]", proto, e); -// } -// } + @Override + public void onTelemetryUpdate(CalculatedFieldLinkedTelemetryMsgProto proto, TbCallback callback) { + try { + CalculatedFieldTelemetryUpdateRequest request = fromProto(proto.getMsg()); + + if (proto.getLinksList().isEmpty()) { + onTelemetryUpdate(proto, callback); + return; + } + + proto.getLinksList().forEach(ctxIdProto -> { + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); + CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId); + + Map updatedTelemetry = request.getMappedTelemetry(ctx, request.getEntityId()); + if (!updatedTelemetry.isEmpty()) { + EntityId targetEntityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); + executeTelemetryUpdate(ctx, targetEntityId, request.getPreviousCalculatedFieldIds(), updatedTelemetry); + } + }); + } catch (Exception e) { + log.trace("Failed to process telemetry update msg: [{}]", proto, e); + } + } private void executeTelemetryUpdate(CalculatedFieldCtx cfCtx, EntityId entityId, List previousCalculatedFieldIds, Map updatedTelemetry) { log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", cfCtx.getTenantId(), entityId, cfCtx.getCfId()); @@ -753,23 +757,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor); } -// private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) { -// return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder() -// .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits()) -// .setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits()) -// .setEntityType(ctxId.entityId().getEntityType().name()) -// .setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits()) -// .setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits()) -// .build(); -// } -// -// private TransportProtos.CalculatedFieldIdProto toProto(CalculatedFieldId cfId) { -// return TransportProtos.CalculatedFieldIdProto.newBuilder() -// .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()) -// .setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits()) -// .build(); -// } - private KvEntry createDefaultKvEntry(Argument argument) { String key = argument.getRefEntityKey().getKey(); String defaultValue = argument.getDefaultValue(); @@ -821,22 +808,28 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds()); - for (TsKvEntry entry : request.getEntries()) { - telemetryMsg.addTsData(ProtoUtils.toTsKvProto(entry)); + List entries = request.getEntries(); + List versions = result.getVersions(); + for (int i = 0; i < entries.size(); i++) { + long tsVersion = versions.get(i); + TsKvProto tsProto = ProtoUtils.toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); + telemetryMsg.addTsData(tsProto); } msg.setTelemetryMsg(telemetryMsg.build()); return msg.build(); } - private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List result) { - //TODO: IM Use result in both methods to update the versions of telemetry/attributes. + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds()); telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); - for (AttributeKvEntry entry : request.getEntries()) { - telemetryMsg.addAttrData(ProtoUtils.toProto(entry)); + List entries = request.getEntries(); + for (int i = 0; i < entries.size(); i++) { + long attrVersion = versions.get(i); + AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build(); + telemetryMsg.addAttrData(attrProto); } msg.setTelemetryMsg(telemetryMsg.build()); @@ -854,15 +847,66 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); for (CalculatedFieldId cfId : calculatedFieldIds) { - CalculatedFieldIdProto.Builder calculatedFieldIdProto = CalculatedFieldIdProto.newBuilder(); - calculatedFieldIdProto.setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()); - calculatedFieldIdProto.setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits()); - telemetryMsg.addPreviousCalculatedFields(calculatedFieldIdProto.build()); + telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); } return telemetryMsg; } + private CalculatedFieldLinkedTelemetryMsgProto buildLinkedTelemetryMsgProto(CalculatedFieldTelemetryMsgProto telemetryProto, List links) { + TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.Builder builder = TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.newBuilder(); + builder.setMsg(telemetryProto); + for (CalculatedFieldEntityCtxId link : links) { + builder.addLinks(toProto(link)); + } + return builder.build(); + } + + private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) { + return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder() + .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits()) + .setEntityType(ctxId.entityId().getEntityType().name()) + .setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits()) + .setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits()) + .build(); + } + + private TransportProtos.CalculatedFieldIdProto toProto(CalculatedFieldId cfId) { + return TransportProtos.CalculatedFieldIdProto.newBuilder() + .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits()) + .build(); + } + + private CalculatedFieldTelemetryUpdateRequest fromProto(CalculatedFieldTelemetryMsgProto proto) { + TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + + if (!proto.getTsDataList().isEmpty()) { + List updatedTelemetry = proto.getTsDataList().stream() + .map(ProtoUtils::fromProto) + .toList(); + return new CalculatedFieldTimeSeriesUpdateRequest( + tenantId, entityId, updatedTelemetry, + proto.getPreviousCalculatedFieldsList().stream() + .map(cfIdProto -> new CalculatedFieldId( + new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) + .toList()); + } else { + AttributeScope scope = AttributeScope.valueOf(proto.getScope().name()); + List updatedTelemetry = proto.getAttrDataList().stream() + .map(ProtoUtils::fromProto) + .toList(); + return new CalculatedFieldAttributeUpdateRequest( + tenantId, entityId, scope, updatedTelemetry, + proto.getPreviousCalculatedFieldsList().stream() + .map(cfIdProto -> new CalculatedFieldId( + new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB()))) + .toList()); + } + } + private static TbQueueCallback wrap(FutureCallback callback) { if (callback != null) { return new FutureCallbackWrapper(callback); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index ab8246ccf5..4703ed1606 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -33,7 +33,6 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.audit.ActionType; -import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.DeviceId; @@ -88,7 +87,7 @@ public class EntityStateSourcingListener { case ASSET -> { onAssetUpdate(event.getEntity(), event.getOldEntity()); } - case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE -> { + case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE, CALCULATED_FIELD -> { tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, lifecycleEvent); } case RULE_CHAIN -> { @@ -123,9 +122,6 @@ public class EntityStateSourcingListener { ApiUsageState apiUsageState = (ApiUsageState) event.getEntity(); tbClusterService.onApiStateChange(apiUsageState, null); } - case CALCULATED_FIELD -> { - onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity()); - } default -> { } } @@ -150,7 +146,7 @@ public class EntityStateSourcingListener { Asset asset = (Asset) event.getEntity(); tbClusterService.onAssetDeleted(tenantId, asset, null); } - case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> { + case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE, CALCULATED_FIELD -> { tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED); } case NOTIFICATION_REQUEST -> { @@ -191,10 +187,6 @@ public class EntityStateSourcingListener { TbResourceInfo tbResource = (TbResourceInfo) event.getEntity(); tbClusterService.onResourceDeleted(tbResource, null); } - case CALCULATED_FIELD -> { - CalculatedField calculatedField = (CalculatedField) event.getEntity(); - tbClusterService.onCalculatedFieldDeleted(tenantId, calculatedField, null); - } default -> { } } @@ -275,15 +267,6 @@ public class EntityStateSourcingListener { } } - private void onCalculatedFieldUpdate(Object entity, Object oldEntity) { - CalculatedField calculatedField = (CalculatedField) entity; - CalculatedField oldCalculatedField = null; - if (oldEntity instanceof CalculatedField) { - oldCalculatedField = (CalculatedField) oldEntity; - } - tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField); - } - private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) { String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice)); if (data != null) { diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index fd8a59c392..5b0c790215 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.mobile.app.MobileApp; import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageLink; @@ -98,9 +99,9 @@ import org.thingsboard.server.dao.device.DeviceCredentialsService; import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.dao.mobile.MobileAppDao; import org.thingsboard.server.dao.notification.NotificationSettingsService; import org.thingsboard.server.dao.notification.NotificationTargetService; -import org.thingsboard.server.dao.mobile.MobileAppDao; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.settings.AdminSettingsService; @@ -308,7 +309,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { jwtSettingsService.saveJwtSettings(jwtSettings); } - List mobiles = mobileAppDao.findByTenantId(TenantId.SYS_TENANT_ID, null, new PageLink(Integer.MAX_VALUE,0)).getData(); + List mobiles = mobileAppDao.findByTenantId(TenantId.SYS_TENANT_ID, null, new PageLink(Integer.MAX_VALUE, 0)).getData(); if (CollectionUtils.isNotEmpty(mobiles)) { mobiles.stream() .filter(mobileApp -> !validateKeyLength(mobileApp.getAppSecret())) @@ -571,7 +572,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { private void save(DeviceId deviceId, String key, boolean value) { if (persistActivityToTelemetry) { - ListenableFuture saveFuture = tsService.save( + ListenableFuture saveFuture = tsService.save( TenantId.SYS_TENANT_ID, deviceId, Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 19d6e295b5..3df2653df6 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -171,7 +171,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer callback.onSuccess(); } -// private void processEntityProfileUpdateMsg(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg) { + // private void processEntityProfileUpdateMsg(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg) { // var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB()); // var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB())); // var oldProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getOldProfileIdMSB(), profileUpdateMsg.getOldProfileIdLSB())); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 3064277bf4..04d6a53401 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -38,12 +38,10 @@ import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.asset.Asset; -import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; -import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; @@ -347,6 +345,13 @@ public class DefaultTbClusterService implements TbClusterService { toCoreMsgs.incrementAndGet(); } + @Override + public void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { + log.trace("PUSHING msg: {} to:{}", msg, tpi); + producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); + toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS + } + @Override public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId); @@ -409,7 +414,7 @@ public class DefaultTbClusterService implements TbClusterService { public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) { DeviceId deviceId = device.getId(); gatewayNotificationsService.onDeviceDeleted(device); - handleProfileEntityEvent(tenantId, deviceId, device.getDeviceProfileId(), false, true); + handleCalculatedFieldEntityDeleted(tenantId, deviceId, device.getDeviceProfileId()); broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback); sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true); broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED); @@ -418,7 +423,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) { AssetId assetId = asset.getId(); - handleProfileEntityEvent(tenantId, assetId, asset.getAssetProfileId(), true, true); + handleCalculatedFieldEntityDeleted(tenantId, assetId, asset.getAssetProfileId()); broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED); } @@ -653,13 +658,13 @@ public class DefaultTbClusterService implements TbClusterService { } boolean deviceProfileChanged = !device.getDeviceProfileId().equals(old.getDeviceProfileId()); if (deviceProfileChanged) { - handleEntityProfileUpdatedEvent(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId()); + handleCalculatedFieldEntityUpdated(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId()); } if (deviceNameChanged || deviceProfileChanged) { pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null); } } else { - handleProfileEntityEvent(device.getTenantId(), device.getId(), device.getDeviceProfileId(), true, false); + handleCalculatedFieldEntityAdded(device.getTenantId(), device.getId(), device.getDeviceProfileId()); } broadcastEntityStateChangeEvent(device.getTenantId(), device.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false); @@ -673,10 +678,10 @@ public class DefaultTbClusterService implements TbClusterService { if (old != null) { boolean assetTypeChanged = !asset.getType().equals(old.getType()); if (assetTypeChanged) { - handleEntityProfileUpdatedEvent(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId()); + handleCalculatedFieldEntityUpdated(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId()); } } else { - handleProfileEntityEvent(asset.getTenantId(), asset.getId(), asset.getAssetProfileId(), true, false); + handleCalculatedFieldEntityAdded(asset.getTenantId(), asset.getId(), asset.getAssetProfileId()); } broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); } @@ -809,52 +814,41 @@ public class DefaultTbClusterService implements TbClusterService { } } - @Override - public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField) { - var created = oldCalculatedField == null; - broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + private void handleCalculatedFieldEntityAdded(TenantId tenantId, EntityId entityId, EntityId newProfileId) { + handleCalculatedFieldEntityUpdateEvent(tenantId, entityId, null, newProfileId, true, false, false); } - @Override - public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback) { - CalculatedFieldId calculatedFieldId = calculatedField.getId(); - broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED); + private void handleCalculatedFieldEntityUpdated(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { + handleCalculatedFieldEntityUpdateEvent(tenantId, entityId, oldProfileId, newProfileId, false, true, true); } - private void handleEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { - TransportProtos.EntityProfileUpdateMsgProto.Builder builder = TransportProtos.EntityProfileUpdateMsgProto.newBuilder(); + private void handleCalculatedFieldEntityDeleted(TenantId tenantId, EntityId entityId, EntityId oldProfileId) { + handleCalculatedFieldEntityUpdateEvent(tenantId, entityId, oldProfileId, null, false, false, true); + } + + private void handleCalculatedFieldEntityUpdateEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId, boolean added, boolean updated, boolean deleted) { + TransportProtos.CalculatedFieldEntityUpdateMsgProto.Builder builder = TransportProtos.CalculatedFieldEntityUpdateMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); builder.setEntityType(entityId.getEntityType().name()); builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - builder.setEntityProfileType(newProfileId.getEntityType().name()); - builder.setOldProfileIdMSB(oldProfileId.getId().getMostSignificantBits()); - builder.setOldProfileIdLSB(oldProfileId.getId().getLeastSignificantBits()); - builder.setNewProfileIdMSB(newProfileId.getId().getMostSignificantBits()); - builder.setNewProfileIdLSB(newProfileId.getId().getLeastSignificantBits()); - TransportProtos.EntityProfileUpdateMsgProto msg = builder.build(); - - broadcastToCore(ToCoreNotificationMsg.newBuilder().setEntityProfileUpdateMsg(msg).build()); - pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityProfileUpdateMsg(msg).build(), null); - } - - private void handleProfileEntityEvent(TenantId tenantId, EntityId entityId, EntityId profileId, boolean added, boolean deleted) { - TransportProtos.ProfileEntityMsgProto.Builder builder = TransportProtos.ProfileEntityMsgProto.newBuilder(); - builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); - builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); - builder.setEntityType(entityId.getEntityType().name()); - builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); - builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - builder.setEntityProfileType(profileId.getEntityType().name()); - builder.setProfileIdMSB(profileId.getId().getMostSignificantBits()); - builder.setProfileIdLSB(profileId.getId().getLeastSignificantBits()); + if (oldProfileId != null) { + builder.setEntityProfileType(oldProfileId.getEntityType().name()); + builder.setOldProfileIdMSB(oldProfileId.getId().getMostSignificantBits()); + builder.setOldProfileIdLSB(oldProfileId.getId().getLeastSignificantBits()); + } + if (newProfileId != null) { + builder.setEntityProfileType(newProfileId.getEntityType().name()); + builder.setNewProfileIdMSB(newProfileId.getId().getMostSignificantBits()); + builder.setNewProfileIdLSB(newProfileId.getId().getLeastSignificantBits()); + } builder.setAdded(added); + builder.setUpdated(updated); builder.setDeleted(deleted); - TransportProtos.ProfileEntityMsgProto msg = builder.build(); + TransportProtos.CalculatedFieldEntityUpdateMsgProto msg = builder.build(); - broadcastToCore(ToCoreNotificationMsg.newBuilder().setProfileEntityMsg(msg).build()); - pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setProfileEntityMsg(msg).build(), null); + pushNotificationToCalculatedFields(tenantId, entityId, ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(msg).build(), null); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index e3a1ca22d3..e7bde845f6 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -39,8 +39,6 @@ import org.thingsboard.server.common.data.event.ErrorEvent; import org.thingsboard.server.common.data.event.Event; import org.thingsboard.server.common.data.event.LifecycleEvent; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.NotificationRequestId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; @@ -88,7 +86,6 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldCache; -import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.notification.NotificationSchedulerService; import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -110,7 +107,6 @@ import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpd import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -153,14 +149,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService, CoreQueueConfig> mainConsumer; private QueueConsumerManager> usageStatsConsumer; private QueueConsumerManager> firmwareStatesConsumer; private volatile ListeningExecutorService deviceActivityEventsExecutor; - private volatile ListeningExecutorService calculatedFieldsExecutor; public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, @@ -183,7 +177,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService, CoreQueueConfig>builder() .queueKey(new QueueKey(ServiceType.TB_CORE)) @@ -319,12 +310,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService entitiesByProfile = calculatedFieldCache.getEntitiesByProfile(tenantId, profileId); - if (added) { - entitiesByProfile.add(entityId); - } else { - entitiesByProfile.remove(entityId); - } - } - private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback callback) { if (msg.hasSubEvent()) { TbEntitySubEventProto subEvent = msg.getSubEvent(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index dac35bfc5c..5b1e5d7d79 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -178,12 +178,16 @@ public abstract class AbstractConsumerService onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); - addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request)), tsCallBackExecutor); } @Override diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index b28bf73c89..d2fbcddfdf 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.asset.Asset; -import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; @@ -40,6 +39,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeMsg; @@ -79,6 +79,8 @@ public interface TbClusterService extends TbQueueClusterService { void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldMsg msg, TbQueueCallback callback); + void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback); + void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback); void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state); @@ -125,8 +127,4 @@ public interface TbClusterService extends TbQueueClusterService { void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId); - void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField); - - void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback); - } diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 3bace4d91f..6a40f13688 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -58,6 +58,7 @@ import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.JsonDataEntry; @@ -628,6 +629,20 @@ public class ProtoUtils { return new BaseAttributeKvEntry(entry, proto.getLastUpdateTs(), proto.hasVersion() ? proto.getVersion() : null); } + public static TsKvEntry fromProto(TransportProtos.TsKvProto proto) { + TransportProtos.KeyValueProto kvProto = proto.getKv(); + String key = kvProto.getKey(); + KvEntry entry = switch (kvProto.getType()) { + case BOOLEAN_V -> new BooleanDataEntry(key, kvProto.getBoolV()); + case LONG_V -> new LongDataEntry(key, kvProto.getLongV()); + case DOUBLE_V -> new DoubleDataEntry(key, kvProto.getDoubleV()); + case STRING_V -> new StringDataEntry(key, kvProto.getStringV()); + case JSON_V -> new JsonDataEntry(key, kvProto.getJsonV()); + default -> null; + }; + return new BasicTsKvEntry(proto.getTs(), entry, proto.hasVersion() ? proto.getVersion() : null); + } + public static TransportProtos.TsKvProto toTsKvProto(TsKvEntry tsKvEntry) { return TransportProtos.TsKvProto.newBuilder() .setTs(tsKvEntry.getTs()) diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 288c923aaa..ede796b12b 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1591,8 +1591,6 @@ message ToCoreMsg { DeviceConnectProto deviceConnectMsg = 50; DeviceDisconnectProto deviceDisconnectMsg = 51; DeviceInactivityProto deviceInactivityMsg = 52; -// CalculatedFieldMsgProto calculatedFieldMsg = 53; -// EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */ @@ -1612,8 +1610,6 @@ message ToCoreNotificationMsg { FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12 [deprecated = true]; ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13; RestApiCallResponseMsgProto restApiCallResponseMsg = 50; -// EntityProfileUpdateMsgProto entityProfileUpdateMsg = 51; -// ProfileEntityMsgProto profileEntityMsg = 52; } /* Messages to Edge queue that are handled by ThingsBoard Core Service */ diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/EntityServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/EntityServiceTest.java index a26f345fb7..8c93245837 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/EntityServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/EntityServiceTest.java @@ -46,6 +46,7 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.ApiUsageStateFilter; import org.thingsboard.server.common.data.query.AssetSearchQueryFilter; @@ -1824,7 +1825,7 @@ public class EntityServiceTest extends AbstractServiceTest { } } - List> timeseriesFutures = new ArrayList<>(); + List> timeseriesFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); timeseriesFutures.add(saveLongTimeseries(device.getId(), "temperature", temperatures.get(i))); @@ -2430,7 +2431,7 @@ public class EntityServiceTest extends AbstractServiceTest { return attributesService.save(SYSTEM_TENANT_ID, entityId, scope, Collections.singletonList(attr)); } - private ListenableFuture saveLongTimeseries(EntityId entityId, String key, Double value) { + private ListenableFuture saveLongTimeseries(EntityId entityId, String key, Double value) { TsKvEntity tsKv = new TsKvEntity(); tsKv.setStrKey(key); tsKv.setDoubleValue(value);