diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java index 43f057ad01..a5461c6152 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java @@ -43,7 +43,7 @@ public class CalculatedFieldEntityActor extends ContextAwareActor { log.debug("[{}][{}] CF entity actor started.", processor.tenantId, processor.entityId); } catch (Exception e) { log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.entityId, e); - throw new TbActorException("Failed to initialize device actor", e); + throw new TbActorException("Failed to initialize CF entity actor", e); } } @@ -56,6 +56,9 @@ public class CalculatedFieldEntityActor extends ContextAwareActor { case CF_ENTITY_TELEMETRY_MSG: processor.process((EntityCalculatedFieldTelemetryMsg) msg); break; + case CF_LINKED_TELEMETRY_MSG: + processor.process((EntityCalculatedFieldLinkedTelemetryMsg) msg); + break; default: return false; } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 533eeb5e31..cea3cce791 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -18,15 +18,19 @@ package org.thingsboard.server.actors.calculatedField; import com.google.common.util.concurrent.ListenableFuture; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; +import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; @@ -40,6 +44,8 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -96,7 +102,25 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } - private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Set cfIds, List cfIdList, MultipleTbCallback callback) { + public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) { + var proto = msg.getProto(); + var ctx = msg.getCtx(); + var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback()); + List cfIds = getCalculatedFieldIds(proto); + if (cfIds.contains(ctx.getCfId())) { + callback.onSuccess(CALLBACKS_PER_CF); + } else { + if (proto.getTsDataCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList())); + } else if (proto.getAttrDataCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList())); + } else { + callback.onSuccess(CALLBACKS_PER_CF); + } + } + } + + private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection cfIds, List cfIdList, MultipleTbCallback callback) { if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); } else { @@ -120,8 +144,9 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList())); } + @SneakyThrows private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, - Map newArgValues) throws InterruptedException, ExecutionException, TimeoutException { + Map newArgValues) { if (newArgValues.isEmpty()) { callback.onSuccess(CALLBACKS_PER_CF); } @@ -159,8 +184,22 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } private Map mapToArguments(CalculatedFieldCtx ctx, List data) { + return mapToArguments(ctx.getMainEntityArguments(), data); + } + + private Map mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, List data) { + var argNames = ctx.getLinkedEntityArguments().get(entityId); + if(argNames.isEmpty()) { + return Collections.emptyMap(); + } + return mapToArguments(argNames, data); + } + + private static Map mapToArguments(Map argNames, List data) { + if (argNames.isEmpty()) { + return Collections.emptyMap(); + } Map arguments = new HashMap<>(); - var argNames = ctx.getMainEntityArguments(); for (TsKvProto item : data) { ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null); String argName = argNames.get(key); @@ -177,8 +216,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } private Map mapToArguments(CalculatedFieldCtx ctx, AttributeScopeProto scope, List attrDataList) { + return mapToArguments(ctx.getMainEntityArguments(), scope, attrDataList); + } + + private Map mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List attrDataList) { + var argNames = ctx.getLinkedEntityArguments().get(entityId); + if(argNames.isEmpty()) { + return Collections.emptyMap(); + } + return mapToArguments(argNames, scope, attrDataList); + } + + private static Map mapToArguments(Map argNames, AttributeScopeProto scope, List attrDataList) { Map arguments = new HashMap<>(); - var argNames = ctx.getMainEntityArguments(); for (AttributeValueProto item : attrDataList) { ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); String argName = argNames.get(key); @@ -196,4 +246,5 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } return cfIds; } + } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index 87292d3206..1c198c660d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -73,6 +73,8 @@ public class CalculatedFieldManagerActor extends ContextAwareActor { processor.onTelemetryMsg((CalculatedFieldTelemetryMsg) msg); break; case CF_LINKED_TELEMETRY_MSG: + processor.onLinkedTelemetryMsg((CalculatedFieldLinkedTelemetryMsg) msg); + break; case CF_ENTITY_UPDATE_MSG: // processor.onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg); break; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index 4bc0589e95..4bce7cb322 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -16,29 +16,52 @@ package org.thingsboard.server.actors.calculatedField; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; +import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.AssetProfileId; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.queue.discovery.HashPartitionService; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -50,18 +73,22 @@ import java.util.concurrent.CopyOnWriteArrayList; @Slf4j public class CalculatedFieldManagerMessageProcessor extends AbstractContextAwareMsgProcessor { - private final Map calculatedFields = new HashMap<>(); + private final Map calculatedFields = new HashMap<>(); private final Map> entityIdCalculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>(); + private final ConcurrentMap> profileEntities = new ConcurrentHashMap<>(); + private final CalculatedFieldExecutionService cfService; private final TbAssetProfileCache assetProfileCache; private final TbDeviceProfileCache deviceProfileCache; protected TbActorCtx ctx; final TenantId tenantId; + private final static int initFetchPackSize = 1024; CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext); + this.cfService = systemContext.getCalculatedFieldExecutionService(); this.assetProfileCache = systemContext.getAssetProfileCache(); this.deviceProfileCache = systemContext.getDeviceProfileCache(); this.tenantId = tenantId; @@ -73,11 +100,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onFieldInitMsg(CalculatedFieldInitMsg msg) { var cf = msg.getCf(); - calculatedFields.put(cf.getId(), cf); + var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()); + calculatedFields.put(cf.getId(), cfCtx); // We use copy on write lists to safely pass the reference to another actor for the iteration. // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) - entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()) - .add(new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService())); + entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); msg.getCallback().onSuccess(); } @@ -99,14 +126,70 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { EntityId entityId = msg.getEntityId(); - var proto = msg.getProto(); + // 2 = 1 for CF processing + 1 for links processing + MultipleTbCallback callback = new MultipleTbCallback(2, msg.getCallback()); // process all cfs related to entity, or it's profile; var entityIdFields = getCalculatedFieldsByEntityId(entityId); var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId)); - //TODO: Transfer only 'part' of the original callback. - getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, msg.getCallback())); + if (!entityIdFields.isEmpty() || !profileIdFields.isEmpty()) { + getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, callback)); + } else { + callback.onSuccess(); + } // process all links (if any); - var links = getCalculatedFieldLinksByEntityId(entityId); + List linkedCalculatedFields = filterCalculatedFieldLinks(msg); + var linksSize = linkedCalculatedFields.size(); + if (linksSize > 0) { + cfService.pushMsgToLinks(msg, linkedCalculatedFields, callback); + } else { + callback.onSuccess(); + } + } + + public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsg msg) { + EntityId sourceEntityId = msg.getEntityId(); + var proto = msg.getProto(); + var linksList = proto.getLinksList(); + for (var linkProto : linksList) { + var link = toCalculatedFieldEntityCtxId(linkProto); + var targetEntityId = link.entityId(); + var targetEntityType = targetEntityId.getEntityType(); + var cf = calculatedFields.get(link.cfId()); + if (EntityType.DEVICE_PROFILE.equals(targetEntityType) || EntityType.ASSET_PROFILE.equals(targetEntityType)) { + // iterate over all entities that belong to profile and push the message for corresponding CF + var entityIds = getEntitiesByProfile(targetEntityId); + if (!entityIds.isEmpty()) { + MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback()); + var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback); + entityIds.forEach(entityId -> getOrCreateActor(entityId).tell(newMsg)); + } else { + msg.getCallback().onSuccess(); + } + } else { + // push the message to specific entity; + var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback()); + getOrCreateActor(targetEntityId).tell(newMsg); + } + } + } + + private CalculatedFieldEntityCtxId toCalculatedFieldEntityCtxId(CalculatedFieldEntityCtxIdProto ctxIdProto) { + EntityId entityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB())); + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB())); + return new CalculatedFieldEntityCtxId(tenantId, calculatedFieldId, entityId); + } + + private List filterCalculatedFieldLinks(CalculatedFieldTelemetryMsg msg) { + EntityId entityId = msg.getEntityId(); + var proto = msg.getProto(); + List result = new ArrayList<>(); + for (var link : getCalculatedFieldLinksByEntityId(entityId)) { + CalculatedFieldCtx ctx = calculatedFields.get(link.getCalculatedFieldId()); + if (ctx.linkMatches(entityId, proto)) { + result.add(ctx.toCalculatedFieldEntityCtxId()); + } + } + return result; } private List getCalculatedFieldsByEntityId(EntityId entityId) { @@ -131,6 +214,30 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware return result; } + private Set getEntitiesByProfile(EntityId entityProfileId) { + Set entities = profileEntities.get(entityProfileId); + if (entities == null) { + entities = switch (entityProfileId.getEntityType()) { + case ASSET_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> { + Set assetIds = new HashSet<>(); + (new PageDataIterable<>(pageLink -> + systemContext.getAssetService().findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) profileId, pageLink), initFetchPackSize)).forEach(assetIds::add); + return assetIds; + }); + case DEVICE_PROFILE -> profileEntities.computeIfAbsent(entityProfileId, profileId -> { + Set deviceIds = new HashSet<>(); + (new PageDataIterable<>(pageLink -> + systemContext.getDeviceService().findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityProfileId, pageLink), initFetchPackSize)).forEach(deviceIds::add); + return deviceIds; + }); + default -> throw new IllegalArgumentException("Entity type should be ASSET_PROFILE or DEVICE_PROFILE."); + }; + } + log.trace("[{}] Found entities by profile in cache: {}", entityProfileId, entities); + return entities; + } + + private EntityId getProfileId(TenantId tenantId, EntityId entityId) { return switch (entityId.getEntityType()) { case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); @@ -139,7 +246,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware }; } - protected TbActorRef getOrCreateActor(EntityId entityId) { + private TbActorRef getOrCreateActor(EntityId entityId) { return ctx.getOrCreateChildActor(new TbCalculatedFieldEntityActorId(entityId), () -> DefaultActorService.CF_ENTITY_DISPATCHER_NAME, () -> new CalculatedFieldEntityActorCreator(systemContext, tenantId, entityId), diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldLinkedTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldLinkedTelemetryMsg.java new file mode 100644 index 0000000000..47b91cbe65 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldLinkedTelemetryMsg.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; + +import java.util.List; + +@Data +public class EntityCalculatedFieldLinkedTelemetryMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final EntityId entityId; + private final CalculatedFieldTelemetryMsgProto proto; + private final CalculatedFieldCtx ctx; + private final TbCallback callback; + + @Override + public MsgType getMsgType() { + return MsgType.CF_LINKED_TELEMETRY_MSG; + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java index 18e700f38c..312cf72bed 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java @@ -15,35 +15,42 @@ */ package org.thingsboard.server.actors.calculatedField; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TbCallback; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +@Slf4j public class MultipleTbCallback implements TbCallback { - + @Getter + private final UUID id; private final AtomicInteger counter; private final TbCallback callback; public MultipleTbCallback(int count, TbCallback callback) { + id = UUID.randomUUID(); this.counter = new AtomicInteger(count); this.callback = callback; } @Override public void onSuccess() { - if (counter.decrementAndGet() <= 0) { - callback.onSuccess(); - } + onSuccess(1); } public void onSuccess(int number) { + log.trace("[{}][{}] onSuccess({})", id, callback.getId(), number); if (counter.addAndGet(-number) <= 0) { + log.trace("[{}][{}] Done.", id, callback.getId()); callback.onSuccess(); } } @Override public void onFailure(Throwable t) { + log.warn("[{}][{}] onFailure.", id, callback.getId()); callback.onFailure(t); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 806c224608..d8daae2e64 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; @@ -53,6 +54,10 @@ public interface CalculatedFieldExecutionService { ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); + void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List cfIds, TbCallback callback); + + void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List linkedCalculatedFields, TbCallback callback); + // void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); /* ===================================================== */ @@ -65,6 +70,5 @@ public interface CalculatedFieldExecutionService { void onEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); - void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List cfIds, TbCallback callback); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index f333eecbd8..19561b73b9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -34,6 +34,8 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; +import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; @@ -83,6 +85,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNot import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.discovery.HashPartitionService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; @@ -636,60 +639,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor); } -// private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List previousCalculatedFieldIds) { -// CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); -// Map argumentsMap = new HashMap<>(argumentValues); -// -// CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId, entityId); -// -// states.compute(entityCtxId, (ctxId, ctx) -> { -// CalculatedFieldEntityCtx calculatedFieldEntityCtx = ctx != null ? ctx : fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()); -// -// CompletableFuture updateFuture = new CompletableFuture<>(); -// -// Consumer performUpdateState = (state) -> { -// if (state.updateState(argumentsMap)) { -// calculatedFieldEntityCtx.setState(state); -// stateService.persistState(entityCtxId, calculatedFieldEntityCtx); -// Map arguments = state.getArguments(); -// boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && -// !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY); -// if (allArgsPresent) { -// performCalculation(calculatedFieldCtx, state, entityId, previousCalculatedFieldIds); -// } -// log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId); -// } -// updateFuture.complete(null); -// }; -// -// CalculatedFieldState state = calculatedFieldEntityCtx.getState(); -// -// boolean allKeysPresent = argumentsMap.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); -// boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream() -// .anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getRefEntityKey().getType()) && state.getArguments().get(argument.getRefEntityKey().getKey()) == null); -// -// if (!allKeysPresent || requiresTsRollingUpdate) { -// Map missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() -// .filter(entry -> !argumentsMap.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getRefEntityKey().getType()) && state.getArguments().get(entry.getKey()) == null)) -// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); -// -// fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentsMap::putAll) -// .addListener(() -> performUpdateState.accept(state), -// calculatedFieldCallbackExecutor); -// } else { -// performUpdateState.accept(state); -// } -// -// try { -// updateFuture.join(); -// } catch (Exception e) { -// log.trace("Failed to update state for ctxId [{}].", ctxId, e); -// throw new RuntimeException("Failed to update or initialize state.", e); -// } -// -// return calculatedFieldEntityCtx; -// }); -// } @Override public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List cfIds, TbCallback callback) { @@ -713,9 +662,74 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }); } catch (Exception e) { log.warn("[{}][{}] Failed to push message to rule engine. CalculatedFieldResult: {}", tenantId, entityId, calculatedFieldResult, e); + callback.onFailure(e); } } + @Override + public void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List linkedCalculatedFields, TbCallback callback) { + Map> unicasts = new HashMap<>(); + List broadcasts = new ArrayList<>(); + for (CalculatedFieldEntityCtxId link : linkedCalculatedFields) { + var linkEntityId = link.entityId(); + var linkEntityType = linkEntityId.getEntityType(); + // Let's assume number of entities in profile is N, and number of partitions is P. If N > P, we save by broadcasting to all partitions. Usually N >> P. + boolean broadcast = EntityType.DEVICE_PROFILE.equals(linkEntityType) || EntityType.ASSET_PROFILE.equals(linkEntityType); + if (broadcast) { + broadcasts.add(link); + } else { + TopicPartitionInfo tpi = partitionService.resolve(HashPartitionService.CALCULATED_FIELD_QUEUE_KEY, link.entityId()); + unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link); + } + } + MultipleTbCallback linkCallback = new MultipleTbCallback(2, callback); + if (!broadcasts.isEmpty()) { + broadcast(broadcasts, msg, linkCallback); + } else { + linkCallback.onSuccess(); + } + if (!unicasts.isEmpty()) { + unicast(unicasts, msg, linkCallback); + } else { + linkCallback.onSuccess(); + } + } + + private void unicast(Map> unicasts, CalculatedFieldTelemetryMsg msg, MultipleTbCallback mainCallback) { + TbQueueCallback callback = new TbCallbackWrapper(new MultipleTbCallback(unicasts.size(), mainCallback)); + unicasts.forEach((topicPartitionInfo, ctxIds) -> { + CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto(msg.getProto(), ctxIds); + clusterService.pushMsgToCalculatedFields(topicPartitionInfo, UUID.randomUUID(), + ToCalculatedFieldMsg.newBuilder().setLinkedTelemetryMsg(linkedTelemetryMsgProto).build(), callback); + }); + } + + private void broadcast(List broadcasts, CalculatedFieldTelemetryMsg msg, MultipleTbCallback mainCallback) { + TbQueueCallback callback = new TbCallbackWrapper(mainCallback); + CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto(msg.getProto(), broadcasts); + clusterService.broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setLinkedTelemetryMsg(linkedTelemetryMsgProto).build(), callback); + } + + private CalculatedFieldLinkedTelemetryMsgProto buildLinkedTelemetryMsgProto(CalculatedFieldTelemetryMsgProto telemetryProto, List links) { + TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.Builder builder = TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.newBuilder(); + builder.setMsg(telemetryProto); + for (CalculatedFieldEntityCtxId link : links) { + builder.addLinks(toProto(link)); + } + return builder.build(); + } + + //TODO: IM: move to utils; + private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) { + return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder() + .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits()) + .setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits()) + .setEntityType(ctxId.entityId().getEntityType().name()) + .setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits()) + .setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits()) + .build(); + } + private ListenableFuture fetchArguments(TenantId tenantId, EntityId entityId, Map necessaryArguments, Consumer> onComplete) { Map argumentValues = new HashMap<>(); List> futures = new ArrayList<>(); @@ -868,25 +882,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return telemetryMsg; } - private CalculatedFieldLinkedTelemetryMsgProto buildLinkedTelemetryMsgProto(CalculatedFieldTelemetryMsgProto telemetryProto, List links) { - TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.Builder builder = TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.newBuilder(); - builder.setMsg(telemetryProto); - for (CalculatedFieldEntityCtxId link : links) { - builder.addLinks(toProto(link)); - } - return builder.build(); - } - - private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) { - return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder() - .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits()) - .setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits()) - .setEntityType(ctxId.entityId().getEntityType().name()) - .setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits()) - .setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits()) - .build(); - } - private TransportProtos.CalculatedFieldIdProto toProto(CalculatedFieldId cfId) { return TransportProtos.CalculatedFieldIdProto.newBuilder() .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits()) @@ -948,4 +943,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private static class TbCallbackWrapper implements TbQueueCallback { + private final TbCallback callback; + + public TbCallbackWrapper(TbCallback callback) { + this.callback = callback; + } + + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + callback.onSuccess(); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 1a0a16254a..37e402fbc8 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -31,7 +31,9 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.util.TbPair; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import java.util.ArrayList; import java.util.HashMap; @@ -143,4 +145,13 @@ public class CalculatedFieldCtx { } return false; } + + public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) { + //TODO: IM - implement + return true; + } + + public CalculatedFieldEntityCtxId toCalculatedFieldEntityCtxId() { + return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 0ccecbbeca..0fcc04bd5d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -223,15 +223,16 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer protected void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) { ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue(); if (toCfNotification.hasComponentLifecycle()) { - // from upstream (maybe removed since we dont need to init state for each partition) + // from upstream (maybe removed since we don't need to init state for each partition) forwardToActorSystem(toCfNotification.getComponentLifecycle(), callback); handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycle())); } else if (toCfNotification.hasEntityUpdateMsg()) { processEntityUpdateMsg(toCfNotification.getEntityUpdateMsg()); - // from upstream (maybe removed since we dont need to update state for each partition) + // from upstream (maybe removed since we don't need to update state for each partition) forwardToActorSystem(toCfNotification.getEntityUpdateMsg(), callback); + } else if (toCfNotification.hasLinkedTelemetryMsg()) { + forwardToActorSystem(toCfNotification.getLinkedTelemetryMsg(), callback); } - callback.onSuccess(); } private void forwardToActorSystem(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index b1a4b1859e..2783e826fe 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -194,6 +194,19 @@ public class DefaultTbClusterService implements TbClusterService { } } + @Override + public void broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg toCfMsg, TbQueueCallback callback) { + UUID msgId = UUID.randomUUID(); + TbQueueProducer> toCfProducer = producerProvider.getCalculatedFieldsNotificationsMsgProducer(); + Set tbReServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); + for (String serviceId : tbReServices) { + TopicPartitionInfo tpi = topicService.getCalculatedFieldNotificationsTopic(serviceId); + toCfProducer.send(tpi, new TbProtoQueueMsg<>(msgId, toCfMsg), null); + toRuleEngineNfs.incrementAndGet(); + } + callback.onSuccess(null); // TODO: refactor to be fair, similar to multi-value callback; + } + @Override public void pushMsgToVersionControl(TenantId tenantId, ToVersionControlServiceMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_VC_EXECUTOR, TenantId.SYS_TENANT_ID, tenantId); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbPackCallback.java index f9d15353c7..3402a357d1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbPackCallback.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.queue; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -23,9 +24,11 @@ import java.util.UUID; @Slf4j public class TbPackCallback implements TbCallback { private final TbPackProcessingContext ctx; + @Getter private final UUID id; public TbPackCallback(UUID id, TbPackProcessingContext ctx) { + log.trace("[{}] CALLBACK CREATED", id); this.id = id; this.ctx = ctx; } diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index 269db0a73a..588b135891 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeMsg; @@ -62,6 +63,8 @@ public interface TbClusterService extends TbQueueClusterService { void broadcastToCore(ToCoreNotificationMsg msg); + void broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg build, TbQueueCallback callback); + void pushMsgToVersionControl(TenantId tenantId, ToVersionControlServiceMsg msg, TbQueueCallback callback); void pushNotificationToCore(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java index a2d264a4ff..f8204fb0ae 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbCallback.java @@ -15,6 +15,10 @@ */ package org.thingsboard.server.common.msg.queue; +import org.thingsboard.server.common.data.id.EntityId; + +import java.util.UUID; + public interface TbCallback { TbCallback EMPTY = new TbCallback() { @@ -30,6 +34,10 @@ public interface TbCallback { } }; + default UUID getId(){ + return EntityId.NULL_UUID; + } + void onSuccess(); void onFailure(Throwable t); diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 9e3e842cd9..69b912120e 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -803,7 +803,7 @@ message CalculatedFieldTelemetryMsgProto { message CalculatedFieldLinkedTelemetryMsgProto { CalculatedFieldTelemetryMsgProto msg = 1; - repeated CalculatedFieldEntityCtxIdProto links = 2; + repeated CalculatedFieldEntityCtxIdProto links = 2; } message CalculatedFieldEntityCtxIdProto { @@ -1639,6 +1639,7 @@ message ToCalculatedFieldMsg { message ToCalculatedFieldNotificationMsg { ComponentLifecycleMsgProto componentLifecycle = 1; CalculatedFieldEntityUpdateMsgProto entityUpdateMsg = 2; + CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 3; } /* Messages that are handled by ThingsBoard RuleEngine Service */