diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index e6f541090c..09488bfe4e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -104,6 +104,7 @@ import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; +import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; @@ -507,6 +508,11 @@ public class ActorSystemContext { @Getter private EntityService entityService; + @Lazy + @Autowired(required = false) + @Getter + private CalculatedFieldExecutionService calculatedFieldExecutionService; + @Value("${actors.session.max_concurrent_sessions_per_device:1}") @Getter private long maxConcurrentSessionsPerDevice; diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 622812b327..9124dc0a9a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; import org.thingsboard.server.common.msg.aware.TenantAwareMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; @@ -111,6 +112,17 @@ public class AppActor extends ContextAwareActor { case SESSION_TIMEOUT_MSG: ctx.broadcastToChildrenByType(msg, EntityType.TENANT); break; + case CF_INIT_MSG: + case CF_LINK_INIT_MSG: + case CF_STATE_RESTORE_MSG: + case CF_UPDATE_MSG: + onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true); + break; + case CF_TELEMETRY_MSG: + case CF_LINKED_TELEMETRY_MSG: + case CF_ENTITY_UPDATE_MSG: + onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, false); + break; default: return false; } @@ -175,6 +187,19 @@ public class AppActor extends ContextAwareActor { } } + private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) { + getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> { + if (priority) { + tenantActor.tellWithHighPriority(msg); + } else { + tenantActor.tell(msg); + } + }, () -> { + msg.getCallback().onSuccess(); + }); + } + + private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> { if (priority) { diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java new file mode 100644 index 0000000000..43f057ad01 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java @@ -0,0 +1,65 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorException; +import org.thingsboard.server.actors.service.ContextAwareActor; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbActorMsg; + +@Slf4j +public class CalculatedFieldEntityActor extends ContextAwareActor { + + private final CalculatedFieldEntityMessageProcessor processor; + + CalculatedFieldEntityActor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) { + super(systemContext); + this.processor = new CalculatedFieldEntityMessageProcessor(systemContext, tenantId, entityId); + } + + @Override + public void init(TbActorCtx ctx) throws TbActorException { + super.init(ctx); + log.debug("[{}][{}] Starting CF entity actor.", processor.tenantId, processor.entityId); + try { + processor.init(ctx); + log.debug("[{}][{}] CF entity actor started.", processor.tenantId, processor.entityId); + } catch (Exception e) { + log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.entityId, e); + throw new TbActorException("Failed to initialize device actor", e); + } + } + + @Override + protected boolean doProcess(TbActorMsg msg) { + switch (msg.getMsgType()) { + case CF_STATE_RESTORE_MSG: + processor.process((CalculatedFieldStateRestoreMsg) msg); + break; + case CF_ENTITY_TELEMETRY_MSG: + processor.process((EntityCalculatedFieldTelemetryMsg) msg); + break; + default: + return false; + } + return true; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java new file mode 100644 index 0000000000..c5f8ecf046 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbEntityActorId; +import org.thingsboard.server.actors.device.DeviceActor; +import org.thingsboard.server.actors.service.ContextBasedCreator; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +public class CalculatedFieldEntityActorCreator extends ContextBasedCreator { + + private final TenantId tenantId; + private final EntityId entityId; + + public CalculatedFieldEntityActorCreator(ActorSystemContext context, TenantId tenantId, EntityId entityId) { + super(context); + this.tenantId = tenantId; + this.entityId = entityId; + } + + @Override + public TbActorId createActorId() { + return new TbEntityActorId(entityId); + } + + @Override + public TbActor createActor() { + return new CalculatedFieldEntityActor(context, tenantId, entityId); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java new file mode 100644 index 0000000000..6bb6256563 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -0,0 +1,199 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import com.google.common.util.concurrent.ListenableFuture; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; +import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; +import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.CalculatedFieldResult; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +/** + * @author Andrew Shvayka + */ +@Slf4j +public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareMsgProcessor { + // (1 for result persistence + 1 for the state persistence ) + public static final int CALLBACKS_PER_CF = 2; + + final TenantId tenantId; + final EntityId entityId; + final CalculatedFieldExecutionService cfService; + + TbActorCtx ctx; + Map states = new HashMap<>(); + + CalculatedFieldEntityMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) { + super(systemContext); + this.tenantId = tenantId; + this.entityId = entityId; + this.cfService = systemContext.getCalculatedFieldExecutionService(); + } + + void init(TbActorCtx ctx) { + this.ctx = ctx; + } + + public void process(EntityCalculatedFieldTelemetryMsg msg) { + var proto = msg.getProto(); + var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size()); + MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback()); + List cfIdList = getCalculatedFieldIds(proto); + Set cfIdSet = new HashSet<>(cfIdList); + for (var ctx : msg.getEntityIdFields()) { + process(ctx, proto, cfIdSet, cfIdList, callback); + } + for (var ctx : msg.getProfileIdFields()) { + process(ctx, proto, cfIdSet, cfIdList, callback); + } + } + + private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Set cfIds, List cfIdList, MultipleTbCallback callback) { + if (cfIds.contains(ctx.getCfId())) { + callback.onSuccess(CALLBACKS_PER_CF); + } else { + if (proto.getTsDataCount() > 0) { + processTelemetry(ctx, proto, cfIdList, callback); + } else if (proto.getAttrDataCount() > 0) { + processAttributes(ctx, proto, cfIdList, callback); + } else { + callback.onSuccess(CALLBACKS_PER_CF); + } + } + } + + @SneakyThrows + private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) { + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList())); + } + + @SneakyThrows + private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) { + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList())); + } + + private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, + Map newArgValues) throws InterruptedException, ExecutionException, TimeoutException { + if (newArgValues.isEmpty()) { + callback.onSuccess(CALLBACKS_PER_CF); + } + CalculatedFieldState state = getOrInitState(ctx); + if (state.updateState(newArgValues)) { + if (state.isReady()) { + CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); + cfIdList = new ArrayList<>(cfIdList); + cfIdList.add(ctx.getCfId()); + cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); + } else { + callback.onSuccess(); // State was updated but no calculation performed; + } + cfService.pushStateToStorage(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback); + } else { + callback.onSuccess(CALLBACKS_PER_CF); + } + } + + @SneakyThrows + private CalculatedFieldState getOrInitState(CalculatedFieldCtx ctx) { + CalculatedFieldState state = states.get(ctx.getCfId()); + if (state != null) { + return state; + } else { + ListenableFuture stateFuture = systemContext.getCalculatedFieldExecutionService().fetchStateFromDb(ctx, entityId); + // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. + // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. + // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, + // but this will significantly complicate the code. + state = stateFuture.get(1, TimeUnit.MINUTES); + states.put(ctx.getCfId(), state); + } + return state; + } + + private Map mapToArguments(CalculatedFieldCtx ctx, List data) { + Map arguments = new HashMap<>(); + var argNames = ctx.getMainEntityArguments(); + for (TsKvProto item : data) { + ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null); + String argName = argNames.get(key); + if (argName != null) { + arguments.put(argName, new SingleValueArgumentEntry(item)); + } + key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_ROLLING, null); + argName = argNames.get(key); + if (argName != null) { + arguments.put(argName, new SingleValueArgumentEntry(item)); + } + } + return arguments; + } + + private Map mapToArguments(CalculatedFieldCtx ctx, AttributeScopeProto scope, List attrDataList) { + Map arguments = new HashMap<>(); + var argNames = ctx.getMainEntityArguments(); + for (AttributeValueProto item : attrDataList) { + ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); + String argName = argNames.get(key); + if (argName != null) { + arguments.put(argName, new SingleValueArgumentEntry(item)); + } + } + return arguments; + } + + private static List getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { + List cfIds = new LinkedList<>(); + for (var cfId : proto.getPreviousCalculatedFieldsList()) { + cfIds.add(new CalculatedFieldId(new UUID(cfId.getCalculatedFieldIdMSB(), cfId.getCalculatedFieldIdLSB()))); + } + return cfIds; + } + + public void process(CalculatedFieldStateRestoreMsg msg) { + states.put(msg.getId().cfId(), msg.getState()); + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldLinkedTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldLinkedTelemetryMsg.java new file mode 100644 index 0000000000..0c352f5072 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldLinkedTelemetryMsg.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; + +@Data +public class CalculatedFieldLinkedTelemetryMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final EntityId entityId; + private final CalculatedFieldLinkedTelemetryMsgProto proto; + private final TbCallback callback; + + + @Override + public MsgType getMsgType() { + return MsgType.CF_LINKED_TELEMETRY_MSG; + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java new file mode 100644 index 0000000000..87292d3206 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -0,0 +1,84 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorException; +import org.thingsboard.server.actors.service.ContextAwareActor; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; + +/** + * Created by ashvayka on 15.03.18. + */ +@Slf4j +public class CalculatedFieldManagerActor extends ContextAwareActor { + + private final CalculatedFieldManagerMessageProcessor processor; + + public CalculatedFieldManagerActor(ActorSystemContext systemContext, TenantId tenantId) { + super(systemContext); + this.processor = new CalculatedFieldManagerMessageProcessor(systemContext, tenantId); + } + + @Override + public void init(TbActorCtx ctx) throws TbActorException { + super.init(ctx); + log.debug("[{}] Starting CF manager actor.", processor.tenantId); + try { + processor.init(ctx); + log.debug("[{}] CF manager actor started.", processor.tenantId); + } catch (Exception e) { + log.warn("[{}] Unknown failure", processor.tenantId, e); + throw new TbActorException("Failed to initialize manager actor", e); + } + } + + @Override + protected boolean doProcess(TbActorMsg msg) { + switch (msg.getMsgType()) { + case PARTITION_CHANGE_MSG: + ctx.broadcastToChildren(msg, true); // TODO + break; + case CF_INIT_MSG: + processor.onFieldInitMsg((CalculatedFieldInitMsg) msg); + break; + case CF_LINK_INIT_MSG: + processor.onLinkInitMsg((CalculatedFieldLinkInitMsg) msg); + break; + case CF_STATE_RESTORE_MSG: + processor.onStateRestoreMsg((CalculatedFieldStateRestoreMsg) msg); + break; + case CF_UPDATE_MSG: +// processor.onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg); + break; + case CF_TELEMETRY_MSG: + processor.onTelemetryMsg((CalculatedFieldTelemetryMsg) msg); + break; + case CF_LINKED_TELEMETRY_MSG: + case CF_ENTITY_UPDATE_MSG: +// processor.onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg); + break; + default: + return false; + } + return true; + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActorCreator.java new file mode 100644 index 0000000000..2a53089bed --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActorCreator.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActor; +import org.thingsboard.server.actors.TbActorId; +import org.thingsboard.server.actors.TbEntityActorId; +import org.thingsboard.server.actors.TbStringActorId; +import org.thingsboard.server.actors.service.ContextBasedCreator; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +public class CalculatedFieldManagerActorCreator extends ContextBasedCreator { + + private final TenantId tenantId; + + public CalculatedFieldManagerActorCreator(ActorSystemContext context, TenantId tenantId) { + super(context); + this.tenantId = tenantId; + } + + @Override + public TbActorId createActorId() { + return new TbStringActorId("CFM|" + tenantId); + } + + @Override + public TbActor createActor() { + return new CalculatedFieldManagerActor(context, tenantId); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java new file mode 100644 index 0000000000..4bc0589e95 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -0,0 +1,149 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId; +import org.thingsboard.server.actors.service.DefaultActorService; +import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.profile.TbAssetProfileCache; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; + + +/** + * @author Andrew Shvayka + */ +@Slf4j +public class CalculatedFieldManagerMessageProcessor extends AbstractContextAwareMsgProcessor { + + private final Map calculatedFields = new HashMap<>(); + private final Map> entityIdCalculatedFields = new ConcurrentHashMap<>(); + private final ConcurrentMap> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>(); + + private final TbAssetProfileCache assetProfileCache; + private final TbDeviceProfileCache deviceProfileCache; + + protected TbActorCtx ctx; + final TenantId tenantId; + + CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) { + super(systemContext); + this.assetProfileCache = systemContext.getAssetProfileCache(); + this.deviceProfileCache = systemContext.getDeviceProfileCache(); + this.tenantId = tenantId; + } + + void init(TbActorCtx ctx) { + this.ctx = ctx; + } + + public void onFieldInitMsg(CalculatedFieldInitMsg msg) { + var cf = msg.getCf(); + calculatedFields.put(cf.getId(), cf); + // We use copy on write lists to safely pass the reference to another actor for the iteration. + // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) + entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()) + .add(new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService())); + msg.getCallback().onSuccess(); + } + + public void onLinkInitMsg(CalculatedFieldLinkInitMsg msg) { + var link = msg.getLink(); + // We use copy on write lists to safely pass the reference to another actor for the iteration. + // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) + entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link); + msg.getCallback().onSuccess(); + } + + public void onStateRestoreMsg(CalculatedFieldStateRestoreMsg msg) { + if (calculatedFields.containsKey(msg.getId().cfId())) { + getOrCreateActor(msg.getId().entityId()).tell(msg); + } else { + // TODO: remove state from storage + } + } + + public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { + EntityId entityId = msg.getEntityId(); + var proto = msg.getProto(); + // process all cfs related to entity, or it's profile; + var entityIdFields = getCalculatedFieldsByEntityId(entityId); + var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId)); + //TODO: Transfer only 'part' of the original callback. + getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, msg.getCallback())); + // process all links (if any); + var links = getCalculatedFieldLinksByEntityId(entityId); + } + + private List getCalculatedFieldsByEntityId(EntityId entityId) { + if (entityId == null) { + return Collections.emptyList(); + } + var result = entityIdCalculatedFields.get(entityId); + if (result == null) { + result = Collections.emptyList(); + } + return result; + } + + private List getCalculatedFieldLinksByEntityId(EntityId entityId) { + if (entityId == null) { + return Collections.emptyList(); + } + var result = entityIdCalculatedFieldLinks.get(entityId); + if (result == null) { + result = Collections.emptyList(); + } + return result; + } + + private EntityId getProfileId(TenantId tenantId, EntityId entityId) { + return switch (entityId.getEntityType()) { + case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); + case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); + default -> null; + }; + } + + protected TbActorRef getOrCreateActor(EntityId entityId) { + return ctx.getOrCreateChildActor(new TbCalculatedFieldEntityActorId(entityId), + () -> DefaultActorService.CF_ENTITY_DISPATCHER_NAME, + () -> new CalculatedFieldEntityActorCreator(systemContext, tenantId, entityId), + () -> true); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java new file mode 100644 index 0000000000..7c0e67f0ec --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java @@ -0,0 +1,40 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; + +@Data +public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMsg { + + private final CalculatedFieldEntityCtxId id; + private final CalculatedFieldState state; + + @Override + public MsgType getMsgType() { + return MsgType.CF_STATE_RESTORE_MSG; + } + + @Override + public TenantId getTenantId() { + return id.tenantId(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldTelemetryMsg.java new file mode 100644 index 0000000000..5cce83b00b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldTelemetryMsg.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; + +@Data +public class CalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final EntityId entityId; + private final CalculatedFieldTelemetryMsgProto proto; + private final TbCallback callback; + + + @Override + public MsgType getMsgType() { + return MsgType.CF_TELEMETRY_MSG; + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldTelemetryMsg.java new file mode 100644 index 0000000000..8465bd8fcc --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldTelemetryMsg.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Data; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; + +import java.util.List; + +@Data +public class EntityCalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final EntityId entityId; + private final CalculatedFieldTelemetryMsgProto proto; + private final List entityIdFields; + private final List profileIdFields; + private final TbCallback callback; + + public EntityCalculatedFieldTelemetryMsg(CalculatedFieldTelemetryMsg msg, + List entityIdFields, + List profileIdFields, + TbCallback callback) { + this.tenantId = msg.getTenantId(); + this.entityId = msg.getEntityId(); + this.proto = msg.getProto(); + this.entityIdFields = entityIdFields; + this.profileIdFields = profileIdFields; + this.callback = callback; + } + + @Override + public MsgType getMsgType() { + return MsgType.CF_ENTITY_TELEMETRY_MSG; + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java new file mode 100644 index 0000000000..18e700f38c --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import org.thingsboard.server.common.msg.queue.TbCallback; + +import java.util.concurrent.atomic.AtomicInteger; + +public class MultipleTbCallback implements TbCallback { + + private final AtomicInteger counter; + private final TbCallback callback; + + public MultipleTbCallback(int count, TbCallback callback) { + this.counter = new AtomicInteger(count); + this.callback = callback; + } + + @Override + public void onSuccess() { + if (counter.decrementAndGet() <= 0) { + callback.onSuccess(); + } + } + + public void onSuccess(int number) { + if (counter.addAndGet(-number) <= 0) { + callback.onSuccess(); + } + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } +} diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 492940a274..7e0d1b4447 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -49,6 +49,8 @@ public class DefaultActorService extends TbApplicationEventListener deletedDevices; + private TbActorRef cfActor; private TenantActor(ActorSystemContext systemContext, TenantId tenantId) { super(systemContext, tenantId); @@ -95,6 +98,11 @@ public class TenantActor extends RuleChainManagerActor { } else { log.info("[{}] Skip init of the rule chains due to API limits", tenantId); } + //TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0; + cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId), + () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME, + () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId), + () -> true); } catch (Exception e) { log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); cantFindTenant = true; @@ -159,12 +167,31 @@ public class TenantActor extends RuleChainManagerActor { case RULE_CHAIN_TO_RULE_CHAIN_MSG: onRuleChainMsg((RuleChainAwareMsg) msg); break; + case CF_INIT_MSG: + case CF_LINK_INIT_MSG: + case CF_STATE_RESTORE_MSG: + case CF_UPDATE_MSG: + onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true); + break; + case CF_TELEMETRY_MSG: + case CF_LINKED_TELEMETRY_MSG: + case CF_ENTITY_UPDATE_MSG: + onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, false); + break; default: return false; } return true; } + private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) { + if (priority) { + cfActor.tellWithHighPriority(msg); + } else { + cfActor.tell(msg); + } + } + private boolean isMyPartition(EntityId entityId) { return systemContext.resolve(ServiceType.TB_CORE, tenantId, entityId).isMyPartition(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 62e234943b..d38c1c58ba 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -15,14 +15,22 @@ */ package org.thingsboard.server.service.cf; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.List; @@ -30,16 +38,17 @@ public interface CalculatedFieldExecutionService { /** * Filter CFs based on the request entity. Push to the queue if any matching CF exist; - * @param request - telemetry save request; - * @param request - telemetry save result; + * + * @param request - telemetry save request; + * @param callback */ - void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result); + void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); - void pushRequestToQueue(AttributesSaveRequest request, List result); + void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); - void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback); + void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); - void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback); + ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); // void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); @@ -53,4 +62,6 @@ public interface CalculatedFieldExecutionService { void onEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); + void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List cfIds, TbCallback callback); + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java new file mode 100644 index 0000000000..25f92e797f --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java @@ -0,0 +1,19 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +public interface CalculatedFieldInitService { +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 4278e74845..b0c9f6adde 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.asset.Asset; @@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.device.DeviceService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index f92b9819ed..bb8ba8bb63 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.cf; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -42,7 +41,6 @@ import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; -import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CalculatedFieldId; @@ -63,7 +61,6 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceType; @@ -108,12 +105,13 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; @@ -167,7 +165,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field")); calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback")); - scheduledExecutor.submit(() -> states.putAll(stateService.restoreStates())); } @PreDestroy @@ -192,22 +189,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result) { + public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); //TODO: 1. check that request entity has calculated fields for entity or profile. If yes - push to corresponding partitions; //TODO: 2. check that request entity has calculated field links. If yes - push to corresponding partitions; //TODO: in 1 and 2 we should do the check as quick as possible. Should we also check the field/link keys?; checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), - () -> toCalculatedFieldTelemetryMsgProto(request, result), request.getCallback()); + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @Override - public void pushRequestToQueue(AttributesSaveRequest request, List result) { + public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), - () -> toCalculatedFieldTelemetryMsgProto(request, result), request.getCallback()); + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId, @@ -241,77 +238,84 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { - - callback.onSuccess(); + public ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId) { + Map> argFutures = new HashMap<>(); + for (var entry : ctx.getArguments().entrySet()) { + var argEntityId = entry.getValue().getRefEntityId() != null ? entry.getValue().getRefEntityId() : entityId; + var argValueFuture = fetchKvEntry(ctx.getTenantId(), argEntityId, entry.getValue()); + argFutures.put(entry.getKey(), argValueFuture); + } + return Futures.whenAllComplete(argFutures.values()).call(() -> { + var result = createStateByType(ctx.getCfType()); + result.updateState(argFutures.entrySet().stream() + .collect(Collectors.toMap( + Entry::getKey, // Keep the key as is + entry -> { + try { + // Resolve the future to get the value + return entry.getValue().get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e); + } + } + ))); + return result; + }, calculatedFieldCallbackExecutor); } @Override - public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) { - - callback.onSuccess(); + public void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) { + stateService.persistState(stateId, state, callback); } @Override protected Map>> onAddedPartitions(Set addedPartitions) { var result = new HashMap>>(); - PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); - Map> tpiTargetEntityMap = new HashMap<>(); - - for (CalculatedField cf : cfs) { - - Consumer resolvePartition = entityId -> { - TopicPartitionInfo tpi; - try { - tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, cf.getTenantId(), entityId); - if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId()))) { - tpiTargetEntityMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(new CalculatedFieldEntityCtxId(cf.getId(), entityId)); - } - } catch (Exception e) { - log.warn("Failed to resolve partition for CalculatedFieldEntityCtxId: entityId=[{}], tenantId=[{}]. Reason: {}", - entityId, cf.getTenantId(), e.getMessage()); - } - }; - - EntityId cfEntityId = cf.getEntityId(); - if (isProfileEntity(cfEntityId)) { - calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId).forEach(resolvePartition); - } else { - resolvePartition.accept(cfEntityId); - } - } - - for (var entry : tpiTargetEntityMap.entrySet()) { - for (List partition : Lists.partition(entry.getValue(), 1000)) { - log.info("[{}] Submit task for CalculatedFields: {}", entry.getKey(), partition.size()); - var future = calculatedFieldExecutor.submit(() -> { - try { - for (CalculatedFieldEntityCtxId ctxId : partition) { - restoreState(ctxId.cfId(), ctxId.entityId()); - } - } catch (Throwable t) { - log.error("Unexpected exception while restoring CalculatedField states", t); - throw t; - } - }); - result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(future); - } - } +// PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); +// Map> tpiTargetEntityMap = new HashMap<>(); +// +// for (CalculatedField cf : cfs) { +// +// Consumer resolvePartition = entityId -> { +// TopicPartitionInfo tpi; +// try { +// tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, cf.getTenantId(), entityId); +// if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId()))) { +// tpiTargetEntityMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(new CalculatedFieldEntityCtxId(cf.getId(), entityId)); +// } +// } catch (Exception e) { +// log.warn("Failed to resolve partition for CalculatedFieldEntityCtxId: entityId=[{}], tenantId=[{}]. Reason: {}", +// entityId, cf.getTenantId(), e.getMessage()); +// } +// }; +// +// EntityId cfEntityId = cf.getEntityId(); +// if (isProfileEntity(cfEntityId)) { +// calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId).forEach(resolvePartition); +// } else { +// resolvePartition.accept(cfEntityId); +// } +// } +// +// for (var entry : tpiTargetEntityMap.entrySet()) { +// for (List partition : Lists.partition(entry.getValue(), 1000)) { +// log.info("[{}] Submit task for CalculatedFields: {}", entry.getKey(), partition.size()); +// var future = calculatedFieldExecutor.submit(() -> { +// try { +// for (CalculatedFieldEntityCtxId ctxId : partition) { +// restoreState(ctxId.cfId(), ctxId.entityId()); +// } +// } catch (Throwable t) { +// log.error("Unexpected exception while restoring CalculatedField states", t); +// throw t; +// } +// }); +// result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(future); +// } +// } return result; } - private void restoreState(CalculatedFieldId calculatedFieldId, EntityId entityId) { - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId, entityId); - CalculatedFieldEntityCtx restoredCtx = stateService.restoreState(ctxId); - - if (restoredCtx != null) { - states.put(ctxId, restoredCtx); - log.info("Restored state for CalculatedField [{}]", calculatedFieldId); - } else { - log.warn("No state found for CalculatedField [{}], entity [{}].", calculatedFieldId, entityId); - } - } - @Override protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) { cleanupEntity(entityId); @@ -491,7 +495,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } else { List ctxIds = tpiStates.computeIfAbsent(targetEntityTpi, k -> new ArrayList<>()); - ctxIds.add(new CalculatedFieldEntityCtxId(ctx.getCfId(), targetEntity)); + ctxIds.add(new CalculatedFieldEntityCtxId(ctx.getTenantId(), ctx.getCfId(), targetEntity)); } } @@ -525,7 +529,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Map argumentValues = updatedTelemetry.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue()))); - updateOrInitializeState(cfCtx, entityId, argumentValues, previousCalculatedFieldIds); +// updateOrInitializeState(cfCtx, entityId, argumentValues, previousCalculatedFieldIds); } @Override @@ -569,9 +573,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void clearState(CalculatedFieldId calculatedFieldId, EntityId entityId) { log.warn("Executing clearState, calculatedFieldId=[{}], entityId=[{}]", calculatedFieldId, entityId); - CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId, entityId); - states.remove(ctxId); - stateService.removeState(ctxId); } private void initializeStateForEntityByProfile(EntityId entityId, EntityId profileId, TbCallback callback) { @@ -601,7 +602,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { @Override public void onSuccess(List results) { - updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, new ArrayList<>()); +// updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, new ArrayList<>()); callback.onSuccess(); } @@ -613,96 +614,83 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor); } - private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List previousCalculatedFieldIds) { - CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); - Map argumentsMap = new HashMap<>(argumentValues); +// private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues, List previousCalculatedFieldIds) { +// CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); +// Map argumentsMap = new HashMap<>(argumentValues); +// +// CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId, entityId); +// +// states.compute(entityCtxId, (ctxId, ctx) -> { +// CalculatedFieldEntityCtx calculatedFieldEntityCtx = ctx != null ? ctx : fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()); +// +// CompletableFuture updateFuture = new CompletableFuture<>(); +// +// Consumer performUpdateState = (state) -> { +// if (state.updateState(argumentsMap)) { +// calculatedFieldEntityCtx.setState(state); +// stateService.persistState(entityCtxId, calculatedFieldEntityCtx); +// Map arguments = state.getArguments(); +// boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && +// !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY); +// if (allArgsPresent) { +// performCalculation(calculatedFieldCtx, state, entityId, previousCalculatedFieldIds); +// } +// log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId); +// } +// updateFuture.complete(null); +// }; +// +// CalculatedFieldState state = calculatedFieldEntityCtx.getState(); +// +// boolean allKeysPresent = argumentsMap.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); +// boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream() +// .anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getRefEntityKey().getType()) && state.getArguments().get(argument.getRefEntityKey().getKey()) == null); +// +// if (!allKeysPresent || requiresTsRollingUpdate) { +// Map missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() +// .filter(entry -> !argumentsMap.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getRefEntityKey().getType()) && state.getArguments().get(entry.getKey()) == null)) +// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); +// +// fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentsMap::putAll) +// .addListener(() -> performUpdateState.accept(state), +// calculatedFieldCallbackExecutor); +// } else { +// performUpdateState.accept(state); +// } +// +// try { +// updateFuture.join(); +// } catch (Exception e) { +// log.trace("Failed to update state for ctxId [{}].", ctxId, e); +// throw new RuntimeException("Failed to update or initialize state.", e); +// } +// +// return calculatedFieldEntityCtx; +// }); +// } - CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId, entityId); - - states.compute(entityCtxId, (ctxId, ctx) -> { - CalculatedFieldEntityCtx calculatedFieldEntityCtx = ctx != null ? ctx : fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()); - - CompletableFuture updateFuture = new CompletableFuture<>(); - - Consumer performUpdateState = (state) -> { - if (state.updateState(argumentsMap)) { - calculatedFieldEntityCtx.setState(state); - stateService.persistState(entityCtxId, calculatedFieldEntityCtx); - Map arguments = state.getArguments(); - boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && - !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY); - if (allArgsPresent) { - performCalculation(calculatedFieldCtx, state, entityId, previousCalculatedFieldIds); - } - log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId); - } - updateFuture.complete(null); - }; - - CalculatedFieldState state = calculatedFieldEntityCtx.getState(); - - boolean allKeysPresent = argumentsMap.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); - boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream() - .anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getRefEntityKey().getType()) && state.getArguments().get(argument.getRefEntityKey().getKey()) == null); - - if (!allKeysPresent || requiresTsRollingUpdate) { - Map missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() - .filter(entry -> !argumentsMap.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getRefEntityKey().getType()) && state.getArguments().get(entry.getKey()) == null)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentsMap::putAll) - .addListener(() -> performUpdateState.accept(state), - calculatedFieldCallbackExecutor); - } else { - performUpdateState.accept(state); - } - - try { - updateFuture.join(); - } catch (Exception e) { - log.trace("Failed to update state for ctxId [{}].", ctxId, e); - throw new RuntimeException("Failed to update or initialize state.", e); - } - - return calculatedFieldEntityCtx; - }); - } - - private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List previousCalculatedFieldIds) { - ListenableFuture resultFuture = state.performCalculation(calculatedFieldCtx); - Futures.addCallback(resultFuture, new FutureCallback<>() { - @Override - public void onSuccess(CalculatedFieldResult result) { - if (result != null) { - pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), calculatedFieldCtx.getCfId(), entityId, result, previousCalculatedFieldIds); - } - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t); - } - }, MoreExecutors.directExecutor()); - } - - private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List previousCalculatedFieldIds) { + @Override + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List cfIds, TbCallback callback) { try { OutputType type = calculatedFieldResult.getType(); TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; ObjectNode payload = createJsonPayload(calculatedFieldResult); - if (previousCalculatedFieldIds != null && previousCalculatedFieldIds.contains(calculatedFieldId)) { - throw new IllegalArgumentException("Calculated field [" + calculatedFieldId.getId() + "] refers to itself, causing an infinite loop."); - } - List calculatedFieldIds = previousCalculatedFieldIds != null - ? new ArrayList<>(previousCalculatedFieldIds) - : new ArrayList<>(); - calculatedFieldIds.add(calculatedFieldId); - TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).previousCalculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build(); - clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null); - log.info("Pushed message to rule engine: originatorId=[{}]", originatorId); + TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build(); + clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + callback.onSuccess(); + log.trace("[{}][{}] Pushed message to rule engine: {} ", tenantId, entityId, msg); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }); } catch (Exception e) { - log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e); + log.warn("[{}][{}] Failed to push message to rule engine. CalculatedFieldResult: {}", tenantId, entityId, calculatedFieldResult, e); } } @@ -781,15 +769,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return new StringDataEntry(key, defaultValue); } - private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) { - CalculatedFieldEntityCtx state = stateService.restoreState(entityCtxId); - - if (state == null) { - return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType)); - } - return state; - } - private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { ObjectNode payload = JacksonUtil.newObjectNode(); Map resultMap = calculatedFieldResult.getResultMap(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java new file mode 100644 index 0000000000..87dda7013f --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldInitService.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg; +import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg; +import org.thingsboard.server.dao.cf.CalculatedFieldService; +import org.thingsboard.server.queue.util.AfterStartUp; +import org.thingsboard.server.queue.util.TbRuleEngineComponent; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; + +@Service +@TbRuleEngineComponent +@RequiredArgsConstructor +public class DefaultCalculatedFieldInitService implements CalculatedFieldInitService { + + private final CalculatedFieldService calculatedFieldService; + private final CalculatedFieldStateService stateService; + private final ActorSystemContext actorSystemContext; + + @Value("${calculated_fields.init_fetch_pack_size:50000}") + @Getter + private int initFetchPackSize; + + @AfterStartUp(order = AfterStartUp.CF_INIT_SERVICE) + public void initCalculatedFieldDefinitions() { + PageDataIterable cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize); + cfs.forEach(cf -> actorSystemContext.tell(new CalculatedFieldInitMsg(cf.getTenantId(), cf))); + PageDataIterable cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); + cfls.forEach(link -> actorSystemContext.tell(new CalculatedFieldLinkInitMsg(link.getTenantId(), link))); + //TODO: combine with the DefaultCalculatedFieldCache. + + } + + @AfterStartUp(order = AfterStartUp.CF_STATE_RESTORE_SERVICE) + public void initCalculatedFieldStates() { + stateService.restoreStates().forEach((k, v) -> actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(k, v))); + } + + + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java index 5fb90a3e46..6ce0a11ade 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf.ctx; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; -public record CalculatedFieldEntityCtxId(CalculatedFieldId cfId, EntityId entityId) { +public record CalculatedFieldEntityCtxId(TenantId tenantId, CalculatedFieldId cfId, EntityId entityId) { } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java index 8bc5756f4e..2bf6b7e0f2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldStateService.java @@ -15,15 +15,18 @@ */ package org.thingsboard.server.service.cf.ctx; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; + import java.util.Map; public interface CalculatedFieldStateService { - Map restoreStates(); + Map restoreStates(); - CalculatedFieldEntityCtx restoreState(CalculatedFieldEntityCtxId ctxId); + CalculatedFieldState restoreState(CalculatedFieldEntityCtxId ctxId); - void persistState(CalculatedFieldEntityCtxId ctxId, CalculatedFieldEntityCtx state); + void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback); void removeState(CalculatedFieldEntityCtxId ctxId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index bc9a421e47..ca243e25b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -66,4 +66,9 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { return stateUpdated; } + @Override + public boolean isReady() { + //TODO: IM + return true; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index e4abbee4cd..1a0a16254a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.util.TbPair; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import java.util.ArrayList; import java.util.HashMap; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index 3c4a680df1..4b7918cc03 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -44,4 +44,5 @@ public interface CalculatedFieldState { ListenableFuture performCalculation(CalculatedFieldCtx ctx); + boolean isReady(); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java index db8950804c..aa82f2fc57 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java @@ -19,6 +19,7 @@ import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.service.cf.RocksDBService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; @@ -36,24 +37,25 @@ public class RocksDBStateService implements CalculatedFieldStateService { private final RocksDBService rocksDBService; @Override - public Map restoreStates() { + public Map restoreStates() { return rocksDBService.getAll().entrySet().stream() .collect(Collectors.toMap( entry -> JacksonUtil.fromString(entry.getKey(), CalculatedFieldEntityCtxId.class), - entry -> JacksonUtil.fromString(entry.getValue(), CalculatedFieldEntityCtx.class) + entry -> JacksonUtil.fromString(entry.getValue(), CalculatedFieldState.class) )); } @Override - public CalculatedFieldEntityCtx restoreState(CalculatedFieldEntityCtxId ctxId) { + public CalculatedFieldState restoreState(CalculatedFieldEntityCtxId ctxId) { return Optional.ofNullable(rocksDBService.get(JacksonUtil.writeValueAsString(ctxId))) - .map(storedState -> JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class)) + .map(storedState -> JacksonUtil.fromString(storedState, CalculatedFieldState.class)) .orElse(null); } @Override - public void persistState(CalculatedFieldEntityCtxId ctxId, CalculatedFieldEntityCtx state) { - rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(state)); + public void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback){ + rocksDBService.put(JacksonUtil.writeValueAsString(stateId), JacksonUtil.writeValueAsString(state)); + callback.onSuccess(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index 8d5080e90f..d5f7b1aee6 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -21,6 +21,10 @@ import lombok.NoArgsConstructor; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.util.KvProtoUtil; +import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; @Data @NoArgsConstructor @@ -34,6 +38,18 @@ public class SingleValueArgumentEntry implements ArgumentEntry { private Long version; + public SingleValueArgumentEntry(TsKvProto entry) { + this.ts = entry.getTs(); + this.version = entry.getVersion(); + this.value = ProtoUtils.fromProto(entry).getValue(); + } + + public SingleValueArgumentEntry(AttributeValueProto entry) { + this.ts = entry.getLastUpdateTs(); + this.version = entry.getVersion(); + this.value = ProtoUtils.fromProto(entry).getValue(); + } + public SingleValueArgumentEntry(KvEntry entry) { if (entry instanceof TsKvEntry tsKvEntry) { this.ts = tsKvEntry.getTs(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index e550a1154e..fb1d6b19ea 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -21,8 +21,6 @@ import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; -import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; @@ -30,23 +28,20 @@ import org.springframework.stereotype.Service; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; +import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; -import org.thingsboard.server.common.msg.MsgType; -import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; -import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -166,10 +161,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer pendingMsgHolder.setMsg(toCfMsg); if (toCfMsg.hasTelemetryMsg()) { log.trace("[{}] Forwarding regular telemetry message for processing {}", id, toCfMsg.getTelemetryMsg()); - forwardToCalculatedFieldService(toCfMsg.getTelemetryMsg(), callback); + forwardToActorSystem(toCfMsg.getTelemetryMsg(), callback); } else if (toCfMsg.hasLinkedTelemetryMsg()) { log.trace("[{}] Forwarding linked telemetry message for processing {}", id, toCfMsg.getLinkedTelemetryMsg()); - forwardToCalculatedFieldService(toCfMsg.getLinkedTelemetryMsg(), callback); + forwardToActorSystem(toCfMsg.getLinkedTelemetryMsg(), callback); } } catch (Throwable e) { log.warn("[{}] Failed to process message: {}", id, msg, e); @@ -219,9 +214,9 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer protected void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) { ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue(); if (toCfNotification.hasComponentLifecycle()) { - forwardToCalculatedFieldService(toCfNotification.getComponentLifecycle(), callback); + forwardToActorSystem(toCfNotification.getComponentLifecycle(), callback); } else if (toCfNotification.hasEntityUpdateMsg()) { - forwardToCalculatedFieldService(toCfNotification.getEntityUpdateMsg(), callback); + forwardToActorSystem(toCfNotification.getEntityUpdateMsg(), callback); } callback.onSuccess(); } @@ -249,33 +244,20 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer // } // - private void forwardToCalculatedFieldService(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) { + private void forwardToActorSystem(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { + var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); + var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); + actorContext.tell(new CalculatedFieldTelemetryMsg(tenantId, entityId, msg, callback)); + } + + private void forwardToActorSystem(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) { var msg = linkedMsg.getMsg(); var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); - var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); - ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onLinkedTelemetryMsg(linkedMsg, callback)); - DonAsynchron.withCallback(future, - __ -> callback.onSuccess(), - t -> { - log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t); - callback.onFailure(t); - }); - + var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); + actorContext.tell(new CalculatedFieldLinkedTelemetryMsg(tenantId, entityId, linkedMsg, callback)); } - private void forwardToCalculatedFieldService(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { - var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); - var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); - ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onTelemetryMsg(msg, callback)); - DonAsynchron.withCallback(future, - __ -> callback.onSuccess(), - t -> { - log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t); - callback.onFailure(t); - }); - } - - private void forwardToCalculatedFieldService(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) { + private void forwardToActorSystem(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldLifecycleMsg(msg, callback)); @@ -287,7 +269,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer }); } - private void forwardToCalculatedFieldService(TransportProtos.CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) { + private void forwardToActorSystem(TransportProtos.CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityUpdateMsg(msg, callback)); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index df18640359..3f7f9c0b7d 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -147,7 +147,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer resultFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); } DonAsynchron.withCallback(resultFuture, result -> { - calculatedFieldExecutionService.pushRequestToQueue(request, result); + calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback()); }, safeCallback(request.getCallback()), tsCallBackExecutor); addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); if (request.isSaveLatest() && !request.isOnlyLatest()) { @@ -167,7 +167,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer log.trace("Executing saveInternal [{}]", request); ListenableFuture> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries()); DonAsynchron.withCallback(saveFuture, result -> { - calculatedFieldExecutionService.pushRequestToQueue(request, result); + calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback()); }, safeCallback(request.getCallback()), tsCallBackExecutor); addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index d62aac5e6c..30a3e51d8c 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -441,6 +441,8 @@ actors: device_dispatcher_pool_size: "${ACTORS_SYSTEM_DEVICE_DISPATCHER_POOL_SIZE:4}" # Thread pool size for actor system dispatcher that process messages for device actors rule_dispatcher_pool_size: "${ACTORS_SYSTEM_RULE_DISPATCHER_POOL_SIZE:8}" # Thread pool size for actor system dispatcher that process messages for rule engine (chain/node) actors edge_dispatcher_pool_size: "${ACTORS_SYSTEM_EDGE_DISPATCHER_POOL_SIZE:4}" # Thread pool size for actor system dispatcher that process messages for edge actors + cfm_dispatcher_pool_size: "${ACTORS_SYSTEM_CFM_DISPATCHER_POOL_SIZE:2}" # Thread pool size for actor system dispatcher that process messages for CalculatedField manager actors + cfe_dispatcher_pool_size: "${ACTORS_SYSTEM_CFE_DISPATCHER_POOL_SIZE:8}" # Thread pool size for actor system dispatcher that process messages for CalculatedField entity actors tenant: create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}" # Create components in initialization session: diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index 9951caa876..a1315cc8bd 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -35,6 +35,8 @@ sql.events.batch_threads=2 actors.system.tenant_dispatcher_pool_size=4 actors.system.device_dispatcher_pool_size=8 actors.system.rule_dispatcher_pool_size=12 +actors.system.cfm_dispatcher_pool_size=2 +actors.system.cfe_dispatcher_pool_size=2 transport.sessions.report_timeout=10000 queue.transport_api.request_poll_interval=5 queue.transport_api.response_poll_interval=5 diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbCalculatedFieldEntityActorId.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbCalculatedFieldEntityActorId.java new file mode 100644 index 0000000000..66522cd08d --- /dev/null +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbCalculatedFieldEntityActorId.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors; + +import lombok.Getter; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.EntityId; + +import java.util.Objects; + +public class TbCalculatedFieldEntityActorId implements TbActorId { + + @Getter + private final EntityId entityId; + + public TbCalculatedFieldEntityActorId(EntityId entityId) { + this.entityId = entityId; + } + + @Override + public String toString() { + return entityId.getEntityType() + "|" + entityId.getId(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TbCalculatedFieldEntityActorId that = (TbCalculatedFieldEntityActorId) o; + return entityId.equals(that.entityId); + } + + @Override + public int hashCode() { + // Magic number to ensure that the hash does not match with the hash of other actor id - (TbEntityActorId) + return 42 + Objects.hash(entityId); + } + + @Override + public EntityType getEntityType() { + return entityId.getEntityType(); + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java index b49495d959..b4bcc77a17 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ReferencedEntityKey.java @@ -27,4 +27,6 @@ public class ReferencedEntityKey { private ArgumentType type; private AttributeScope scope; + + } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index b05bb75032..91419bee9c 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -133,7 +133,16 @@ public enum MsgType { * Messages that are sent to and from edge session to start edge synchronization process */ EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG, - EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG; + EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG, + + CF_INIT_MSG, // Sent to init particular calculated field; + CF_LINK_INIT_MSG, // Sent to init particular calculated field; + CF_STATE_RESTORE_MSG,// Sent to init particular calculated field entity state; + CF_TELEMETRY_MSG, + CF_ENTITY_TELEMETRY_MSG, + CF_LINKED_TELEMETRY_MSG, + CF_UPDATE_MSG, + CF_ENTITY_UPDATE_MSG; @Getter private final boolean ignoreOnStart; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/ToCalculatedFieldSystemMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/ToCalculatedFieldSystemMsg.java new file mode 100644 index 0000000000..2da959ec7d --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/ToCalculatedFieldSystemMsg.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg; + +import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; + +public interface ToCalculatedFieldSystemMsg extends TenantAwareMsg { + + default TbCallback getCallback() { + return TbCallback.EMPTY; + } + +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitMsg.java new file mode 100644 index 0000000000..4d9f4ce414 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldInitMsg.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.cf; + +import lombok.Data; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; + +@Data +public class CalculatedFieldInitMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final CalculatedField cf; + + @Override + public MsgType getMsgType() { + return MsgType.CF_INIT_MSG; + } +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldLinkInitMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldLinkInitMsg.java new file mode 100644 index 0000000000..7f582e6ff8 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/cf/CalculatedFieldLinkInitMsg.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.cf; + +import lombok.Data; +import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; + +@Data +public class CalculatedFieldLinkInitMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final CalculatedFieldLink link; + + @Override + public MsgType getMsgType() { + return MsgType.CF_LINK_INIT_MSG; + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java index 53ff839c49..70ae853e1e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/AfterStartUp.java @@ -38,6 +38,10 @@ public @interface AfterStartUp { int ACTOR_SYSTEM = 9; int REGULAR_SERVICE = 10; + int CF_INIT_SERVICE = 10; + int CF_STATE_RESTORE_SERVICE = 11; + int CF_CONSUMER_SERVICE = 12; + int BEFORE_TRANSPORT_SERVICE = Integer.MAX_VALUE - 1001; int TRANSPORT_SERVICE = Integer.MAX_VALUE - 1000; int AFTER_TRANSPORT_SERVICE = Integer.MAX_VALUE - 999; diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index 1e11c77f9a..a1bb335ad0 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -145,6 +145,8 @@ sql.events.batch_threads=2 actors.system.tenant_dispatcher_pool_size=4 actors.system.device_dispatcher_pool_size=8 actors.system.rule_dispatcher_pool_size=12 +actors.system.cfm_dispatcher_pool_size=2 +actors.system.cfe_dispatcher_pool_size=2 transport.sessions.report_timeout=10000 queue.transport_api.request_poll_interval=5 queue.transport_api.response_poll_interval=5