WIP: Cluster mode refactoring

This commit is contained in:
Andrii Shvaika 2025-01-27 17:34:35 +02:00
parent e09ef84f43
commit d3278f05bb
39 changed files with 1364 additions and 222 deletions

View File

@ -104,6 +104,7 @@ import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
@ -507,6 +508,11 @@ public class ActorSystemContext {
@Getter
private EntityService entityService;
@Lazy
@Autowired(required = false)
@Getter
private CalculatedFieldExecutionService calculatedFieldExecutionService;
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
@Getter
private long maxConcurrentSessionsPerDevice;

View File

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@ -111,6 +112,17 @@ public class AppActor extends ContextAwareActor {
case SESSION_TIMEOUT_MSG:
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
break;
case CF_INIT_MSG:
case CF_LINK_INIT_MSG:
case CF_STATE_RESTORE_MSG:
case CF_UPDATE_MSG:
onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true);
break;
case CF_TELEMETRY_MSG:
case CF_LINKED_TELEMETRY_MSG:
case CF_ENTITY_UPDATE_MSG:
onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, false);
break;
default:
return false;
}
@ -175,6 +187,19 @@ public class AppActor extends ContextAwareActor {
}
}
private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> {
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
}, () -> {
msg.getCallback().onSuccess();
});
}
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> {
if (priority) {

View File

@ -0,0 +1,65 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
@Slf4j
public class CalculatedFieldEntityActor extends ContextAwareActor {
private final CalculatedFieldEntityMessageProcessor processor;
CalculatedFieldEntityActor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) {
super(systemContext);
this.processor = new CalculatedFieldEntityMessageProcessor(systemContext, tenantId, entityId);
}
@Override
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.debug("[{}][{}] Starting CF entity actor.", processor.tenantId, processor.entityId);
try {
processor.init(ctx);
log.debug("[{}][{}] CF entity actor started.", processor.tenantId, processor.entityId);
} catch (Exception e) {
log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.entityId, e);
throw new TbActorException("Failed to initialize device actor", e);
}
}
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case CF_STATE_RESTORE_MSG:
processor.process((CalculatedFieldStateRestoreMsg) msg);
break;
case CF_ENTITY_TELEMETRY_MSG:
processor.process((EntityCalculatedFieldTelemetryMsg) msg);
break;
default:
return false;
}
return true;
}
}

View File

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.device.DeviceActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
public class CalculatedFieldEntityActorCreator extends ContextBasedCreator {
private final TenantId tenantId;
private final EntityId entityId;
public CalculatedFieldEntityActorCreator(ActorSystemContext context, TenantId tenantId, EntityId entityId) {
super(context);
this.tenantId = tenantId;
this.entityId = entityId;
}
@Override
public TbActorId createActorId() {
return new TbEntityActorId(entityId);
}
@Override
public TbActor createActor() {
return new CalculatedFieldEntityActor(context, tenantId, entityId);
}
}

View File

@ -0,0 +1,199 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareMsgProcessor {
// (1 for result persistence + 1 for the state persistence )
public static final int CALLBACKS_PER_CF = 2;
final TenantId tenantId;
final EntityId entityId;
final CalculatedFieldExecutionService cfService;
TbActorCtx ctx;
Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>();
CalculatedFieldEntityMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) {
super(systemContext);
this.tenantId = tenantId;
this.entityId = entityId;
this.cfService = systemContext.getCalculatedFieldExecutionService();
}
void init(TbActorCtx ctx) {
this.ctx = ctx;
}
public void process(EntityCalculatedFieldTelemetryMsg msg) {
var proto = msg.getProto();
var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size());
MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback());
List<CalculatedFieldId> cfIdList = getCalculatedFieldIds(proto);
Set<CalculatedFieldId> cfIdSet = new HashSet<>(cfIdList);
for (var ctx : msg.getEntityIdFields()) {
process(ctx, proto, cfIdSet, cfIdList, callback);
}
for (var ctx : msg.getProfileIdFields()) {
process(ctx, proto, cfIdSet, cfIdList, callback);
}
}
private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Set<CalculatedFieldId> cfIds, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
if (cfIds.contains(ctx.getCfId())) {
callback.onSuccess(CALLBACKS_PER_CF);
} else {
if (proto.getTsDataCount() > 0) {
processTelemetry(ctx, proto, cfIdList, callback);
} else if (proto.getAttrDataCount() > 0) {
processAttributes(ctx, proto, cfIdList, callback);
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
}
}
@SneakyThrows
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()));
}
@SneakyThrows
private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()));
}
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
Map<String, ArgumentEntry> newArgValues) throws InterruptedException, ExecutionException, TimeoutException {
if (newArgValues.isEmpty()) {
callback.onSuccess(CALLBACKS_PER_CF);
}
CalculatedFieldState state = getOrInitState(ctx);
if (state.updateState(newArgValues)) {
if (state.isReady()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
cfIdList = new ArrayList<>(cfIdList);
cfIdList.add(ctx.getCfId());
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
} else {
callback.onSuccess(); // State was updated but no calculation performed;
}
cfService.pushStateToStorage(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), state, callback);
} else {
callback.onSuccess(CALLBACKS_PER_CF);
}
}
@SneakyThrows
private CalculatedFieldState getOrInitState(CalculatedFieldCtx ctx) {
CalculatedFieldState state = states.get(ctx.getCfId());
if (state != null) {
return state;
} else {
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldExecutionService().fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
states.put(ctx.getCfId(), state);
}
return state;
}
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, List<TsKvProto> data) {
Map<String, ArgumentEntry> arguments = new HashMap<>();
var argNames = ctx.getMainEntityArguments();
for (TsKvProto item : data) {
ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null);
String argName = argNames.get(key);
if (argName != null) {
arguments.put(argName, new SingleValueArgumentEntry(item));
}
key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_ROLLING, null);
argName = argNames.get(key);
if (argName != null) {
arguments.put(argName, new SingleValueArgumentEntry(item));
}
}
return arguments;
}
private Map<String, ArgumentEntry> mapToArguments(CalculatedFieldCtx ctx, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) {
Map<String, ArgumentEntry> arguments = new HashMap<>();
var argNames = ctx.getMainEntityArguments();
for (AttributeValueProto item : attrDataList) {
ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
String argName = argNames.get(key);
if (argName != null) {
arguments.put(argName, new SingleValueArgumentEntry(item));
}
}
return arguments;
}
private static List<CalculatedFieldId> getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) {
List<CalculatedFieldId> cfIds = new LinkedList<>();
for (var cfId : proto.getPreviousCalculatedFieldsList()) {
cfIds.add(new CalculatedFieldId(new UUID(cfId.getCalculatedFieldIdMSB(), cfId.getCalculatedFieldIdLSB())));
}
return cfIds;
}
public void process(CalculatedFieldStateRestoreMsg msg) {
states.put(msg.getId().cfId(), msg.getState());
}
}

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
@Data
public class CalculatedFieldLinkedTelemetryMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final EntityId entityId;
private final CalculatedFieldLinkedTelemetryMsgProto proto;
private final TbCallback callback;
@Override
public MsgType getMsgType() {
return MsgType.CF_LINKED_TELEMETRY_MSG;
}
}

View File

@ -0,0 +1,84 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
/**
* Created by ashvayka on 15.03.18.
*/
@Slf4j
public class CalculatedFieldManagerActor extends ContextAwareActor {
private final CalculatedFieldManagerMessageProcessor processor;
public CalculatedFieldManagerActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.processor = new CalculatedFieldManagerMessageProcessor(systemContext, tenantId);
}
@Override
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.debug("[{}] Starting CF manager actor.", processor.tenantId);
try {
processor.init(ctx);
log.debug("[{}] CF manager actor started.", processor.tenantId);
} catch (Exception e) {
log.warn("[{}] Unknown failure", processor.tenantId, e);
throw new TbActorException("Failed to initialize manager actor", e);
}
}
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg, true); // TODO
break;
case CF_INIT_MSG:
processor.onFieldInitMsg((CalculatedFieldInitMsg) msg);
break;
case CF_LINK_INIT_MSG:
processor.onLinkInitMsg((CalculatedFieldLinkInitMsg) msg);
break;
case CF_STATE_RESTORE_MSG:
processor.onStateRestoreMsg((CalculatedFieldStateRestoreMsg) msg);
break;
case CF_UPDATE_MSG:
// processor.onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg);
break;
case CF_TELEMETRY_MSG:
processor.onTelemetryMsg((CalculatedFieldTelemetryMsg) msg);
break;
case CF_LINKED_TELEMETRY_MSG:
case CF_ENTITY_UPDATE_MSG:
// processor.onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg);
break;
default:
return false;
}
return true;
}
}

View File

@ -0,0 +1,46 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.TbStringActorId;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
public class CalculatedFieldManagerActorCreator extends ContextBasedCreator {
private final TenantId tenantId;
public CalculatedFieldManagerActorCreator(ActorSystemContext context, TenantId tenantId) {
super(context);
this.tenantId = tenantId;
}
@Override
public TbActorId createActorId() {
return new TbStringActorId("CFM|" + tenantId);
}
@Override
public TbActor createActor() {
return new CalculatedFieldManagerActor(context, tenantId);
}
}

View File

@ -0,0 +1,149 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class CalculatedFieldManagerMessageProcessor extends AbstractContextAwareMsgProcessor {
private final Map<CalculatedFieldId, CalculatedField> calculatedFields = new HashMap<>();
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new ConcurrentHashMap<>();
private final ConcurrentMap<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new ConcurrentHashMap<>();
private final TbAssetProfileCache assetProfileCache;
private final TbDeviceProfileCache deviceProfileCache;
protected TbActorCtx ctx;
final TenantId tenantId;
CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.assetProfileCache = systemContext.getAssetProfileCache();
this.deviceProfileCache = systemContext.getDeviceProfileCache();
this.tenantId = tenantId;
}
void init(TbActorCtx ctx) {
this.ctx = ctx;
}
public void onFieldInitMsg(CalculatedFieldInitMsg msg) {
var cf = msg.getCf();
calculatedFields.put(cf.getId(), cf);
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>())
.add(new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService()));
msg.getCallback().onSuccess();
}
public void onLinkInitMsg(CalculatedFieldLinkInitMsg msg) {
var link = msg.getLink();
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link);
msg.getCallback().onSuccess();
}
public void onStateRestoreMsg(CalculatedFieldStateRestoreMsg msg) {
if (calculatedFields.containsKey(msg.getId().cfId())) {
getOrCreateActor(msg.getId().entityId()).tell(msg);
} else {
// TODO: remove state from storage
}
}
public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) {
EntityId entityId = msg.getEntityId();
var proto = msg.getProto();
// process all cfs related to entity, or it's profile;
var entityIdFields = getCalculatedFieldsByEntityId(entityId);
var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId));
//TODO: Transfer only 'part' of the original callback.
getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, msg.getCallback()));
// process all links (if any);
var links = getCalculatedFieldLinksByEntityId(entityId);
}
private List<CalculatedFieldCtx> getCalculatedFieldsByEntityId(EntityId entityId) {
if (entityId == null) {
return Collections.emptyList();
}
var result = entityIdCalculatedFields.get(entityId);
if (result == null) {
result = Collections.emptyList();
}
return result;
}
private List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(EntityId entityId) {
if (entityId == null) {
return Collections.emptyList();
}
var result = entityIdCalculatedFieldLinks.get(entityId);
if (result == null) {
result = Collections.emptyList();
}
return result;
}
private EntityId getProfileId(TenantId tenantId, EntityId entityId) {
return switch (entityId.getEntityType()) {
case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId();
case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
default -> null;
};
}
protected TbActorRef getOrCreateActor(EntityId entityId) {
return ctx.getOrCreateChildActor(new TbCalculatedFieldEntityActorId(entityId),
() -> DefaultActorService.CF_ENTITY_DISPATCHER_NAME,
() -> new CalculatedFieldEntityActorCreator(systemContext, tenantId, entityId),
() -> true);
}
}

View File

@ -0,0 +1,40 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
@Data
public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMsg {
private final CalculatedFieldEntityCtxId id;
private final CalculatedFieldState state;
@Override
public MsgType getMsgType() {
return MsgType.CF_STATE_RESTORE_MSG;
}
@Override
public TenantId getTenantId() {
return id.tenantId();
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
@Data
public class CalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final EntityId entityId;
private final CalculatedFieldTelemetryMsgProto proto;
private final TbCallback callback;
@Override
public MsgType getMsgType() {
return MsgType.CF_TELEMETRY_MSG;
}
}

View File

@ -0,0 +1,55 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.List;
@Data
public class EntityCalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final EntityId entityId;
private final CalculatedFieldTelemetryMsgProto proto;
private final List<CalculatedFieldCtx> entityIdFields;
private final List<CalculatedFieldCtx> profileIdFields;
private final TbCallback callback;
public EntityCalculatedFieldTelemetryMsg(CalculatedFieldTelemetryMsg msg,
List<CalculatedFieldCtx> entityIdFields,
List<CalculatedFieldCtx> profileIdFields,
TbCallback callback) {
this.tenantId = msg.getTenantId();
this.entityId = msg.getEntityId();
this.proto = msg.getProto();
this.entityIdFields = entityIdFields;
this.profileIdFields = profileIdFields;
this.callback = callback;
}
@Override
public MsgType getMsgType() {
return MsgType.CF_ENTITY_TELEMETRY_MSG;
}
}

View File

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.util.concurrent.atomic.AtomicInteger;
public class MultipleTbCallback implements TbCallback {
private final AtomicInteger counter;
private final TbCallback callback;
public MultipleTbCallback(int count, TbCallback callback) {
this.counter = new AtomicInteger(count);
this.callback = callback;
}
@Override
public void onSuccess() {
if (counter.decrementAndGet() <= 0) {
callback.onSuccess();
}
}
public void onSuccess(int number) {
if (counter.addAndGet(-number) <= 0) {
callback.onSuccess();
}
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}

View File

@ -49,6 +49,8 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
public static final String TENANT_DISPATCHER_NAME = "tenant-dispatcher";
public static final String DEVICE_DISPATCHER_NAME = "device-dispatcher";
public static final String RULE_DISPATCHER_NAME = "rule-dispatcher";
public static final String CF_MANAGER_DISPATCHER_NAME = "cf-manager-dispatcher";
public static final String CF_ENTITY_DISPATCHER_NAME = "cf-entity-dispatcher";
@Autowired
private ActorSystemContext actorContext;
@ -78,6 +80,13 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
@Value("${actors.system.rule_dispatcher_pool_size:8}")
private int ruleDispatcherSize;
@Value("${actors.system.cfm_dispatcher_pool_size:2}")
private int calculatedFieldManagerDispatcherSize;
@Value("${actors.system.cfe_dispatcher_pool_size:8}")
private int calculatedFieldEntityDispatcherSize;
@PostConstruct
public void initActorSystem() {
log.info("Initializing actor system.");
@ -89,6 +98,8 @@ public class DefaultActorService extends TbApplicationEventListener<PartitionCha
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
system.createDispatcher(CF_MANAGER_DISPATCHER_NAME, initDispatcherExecutor(CF_MANAGER_DISPATCHER_NAME, calculatedFieldManagerDispatcherSize));
system.createDispatcher(CF_ENTITY_DISPATCHER_NAME, initDispatcherExecutor(CF_ENTITY_DISPATCHER_NAME, calculatedFieldEntityDispatcherSize));
actorContext.setActorSystem(system);

View File

@ -26,6 +26,8 @@ import org.thingsboard.server.actors.TbActorNotRegisteredException;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate;
import org.thingsboard.server.actors.TbStringActorId;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldManagerActorCreator;
import org.thingsboard.server.actors.device.DeviceActorCreator;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@ -44,6 +46,7 @@ import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
@ -64,8 +67,8 @@ public class TenantActor extends RuleChainManagerActor {
private boolean isRuleEngine;
private boolean isCore;
private ApiUsageState apiUsageState;
private Set<DeviceId> deletedDevices;
private TbActorRef cfActor;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext, tenantId);
@ -95,6 +98,11 @@ public class TenantActor extends RuleChainManagerActor {
} else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
}
//TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0;
cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId),
() -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME,
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
() -> true);
} catch (Exception e) {
log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
cantFindTenant = true;
@ -159,12 +167,31 @@ public class TenantActor extends RuleChainManagerActor {
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
onRuleChainMsg((RuleChainAwareMsg) msg);
break;
case CF_INIT_MSG:
case CF_LINK_INIT_MSG:
case CF_STATE_RESTORE_MSG:
case CF_UPDATE_MSG:
onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true);
break;
case CF_TELEMETRY_MSG:
case CF_LINKED_TELEMETRY_MSG:
case CF_ENTITY_UPDATE_MSG:
onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, false);
break;
default:
return false;
}
return true;
}
private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) {
if (priority) {
cfActor.tellWithHighPriority(msg);
} else {
cfActor.tell(msg);
}
}
private boolean isMyPartition(EntityId entityId) {
return systemContext.resolve(ServiceType.TB_CORE, tenantId, entityId).isMyPartition();
}

View File

@ -15,14 +15,22 @@
*/
package org.thingsboard.server.service.cf;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import java.util.List;
@ -30,16 +38,17 @@ public interface CalculatedFieldExecutionService {
/**
* Filter CFs based on the request entity. Push to the queue if any matching CF exist;
* @param request - telemetry save request;
* @param request - telemetry save result;
*
* @param request - telemetry save request;
* @param callback
*/
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result);
void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result);
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback);
void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback);
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);
// void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback);
@ -53,4 +62,6 @@ public interface CalculatedFieldExecutionService {
void onEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback);
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List<CalculatedFieldId> cfIds, TbCallback callback);
}

View File

@ -0,0 +1,19 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
public interface CalculatedFieldInitService {
}

View File

@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.asset.Asset;
@ -36,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.device.DeviceService;

View File

@ -17,7 +17,6 @@ package org.thingsboard.server.service.cf;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -42,7 +41,6 @@ import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.OutputType;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
@ -63,7 +61,6 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
@ -108,12 +105,13 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -167,7 +165,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field"));
calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback"));
scheduledExecutor.submit(() -> states.putAll(stateService.restoreStates()));
}
@PreDestroy
@ -192,22 +189,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
@Override
public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result) {
public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
//TODO: 1. check that request entity has calculated fields for entity or profile. If yes - push to corresponding partitions;
//TODO: 2. check that request entity has calculated field links. If yes - push to corresponding partitions;
//TODO: in 1 and 2 we should do the check as quick as possible. Should we also check the field/link keys?;
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()),
() -> toCalculatedFieldTelemetryMsgProto(request, result), request.getCallback());
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
@Override
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result) {
public void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()),
() -> toCalculatedFieldTelemetryMsgProto(request, result), request.getCallback());
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId,
@ -241,77 +238,84 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
@Override
public void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) {
callback.onSuccess();
public ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId) {
Map<String, ListenableFuture<ArgumentEntry>> argFutures = new HashMap<>();
for (var entry : ctx.getArguments().entrySet()) {
var argEntityId = entry.getValue().getRefEntityId() != null ? entry.getValue().getRefEntityId() : entityId;
var argValueFuture = fetchKvEntry(ctx.getTenantId(), argEntityId, entry.getValue());
argFutures.put(entry.getKey(), argValueFuture);
}
return Futures.whenAllComplete(argFutures.values()).call(() -> {
var result = createStateByType(ctx.getCfType());
result.updateState(argFutures.entrySet().stream()
.collect(Collectors.toMap(
Entry::getKey, // Keep the key as is
entry -> {
try {
// Resolve the future to get the value
return entry.getValue().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e);
}
}
)));
return result;
}, calculatedFieldCallbackExecutor);
}
@Override
public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) {
callback.onSuccess();
public void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback) {
stateService.persistState(stateId, state, callback);
}
@Override
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiTargetEntityMap = new HashMap<>();
for (CalculatedField cf : cfs) {
Consumer<EntityId> resolvePartition = entityId -> {
TopicPartitionInfo tpi;
try {
tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, cf.getTenantId(), entityId);
if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId()))) {
tpiTargetEntityMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(new CalculatedFieldEntityCtxId(cf.getId(), entityId));
}
} catch (Exception e) {
log.warn("Failed to resolve partition for CalculatedFieldEntityCtxId: entityId=[{}], tenantId=[{}]. Reason: {}",
entityId, cf.getTenantId(), e.getMessage());
}
};
EntityId cfEntityId = cf.getEntityId();
if (isProfileEntity(cfEntityId)) {
calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId).forEach(resolvePartition);
} else {
resolvePartition.accept(cfEntityId);
}
}
for (var entry : tpiTargetEntityMap.entrySet()) {
for (List<CalculatedFieldEntityCtxId> partition : Lists.partition(entry.getValue(), 1000)) {
log.info("[{}] Submit task for CalculatedFields: {}", entry.getKey(), partition.size());
var future = calculatedFieldExecutor.submit(() -> {
try {
for (CalculatedFieldEntityCtxId ctxId : partition) {
restoreState(ctxId.cfId(), ctxId.entityId());
}
} catch (Throwable t) {
log.error("Unexpected exception while restoring CalculatedField states", t);
throw t;
}
});
result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(future);
}
}
// PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
// Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiTargetEntityMap = new HashMap<>();
//
// for (CalculatedField cf : cfs) {
//
// Consumer<EntityId> resolvePartition = entityId -> {
// TopicPartitionInfo tpi;
// try {
// tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, cf.getTenantId(), entityId);
// if (addedPartitions.contains(tpi) && states.keySet().stream().noneMatch(ctxId -> ctxId.cfId().equals(cf.getId()))) {
// tpiTargetEntityMap.computeIfAbsent(tpi, k -> new ArrayList<>()).add(new CalculatedFieldEntityCtxId(cf.getId(), entityId));
// }
// } catch (Exception e) {
// log.warn("Failed to resolve partition for CalculatedFieldEntityCtxId: entityId=[{}], tenantId=[{}]. Reason: {}",
// entityId, cf.getTenantId(), e.getMessage());
// }
// };
//
// EntityId cfEntityId = cf.getEntityId();
// if (isProfileEntity(cfEntityId)) {
// calculatedFieldCache.getEntitiesByProfile(cf.getTenantId(), cfEntityId).forEach(resolvePartition);
// } else {
// resolvePartition.accept(cfEntityId);
// }
// }
//
// for (var entry : tpiTargetEntityMap.entrySet()) {
// for (List<CalculatedFieldEntityCtxId> partition : Lists.partition(entry.getValue(), 1000)) {
// log.info("[{}] Submit task for CalculatedFields: {}", entry.getKey(), partition.size());
// var future = calculatedFieldExecutor.submit(() -> {
// try {
// for (CalculatedFieldEntityCtxId ctxId : partition) {
// restoreState(ctxId.cfId(), ctxId.entityId());
// }
// } catch (Throwable t) {
// log.error("Unexpected exception while restoring CalculatedField states", t);
// throw t;
// }
// });
// result.computeIfAbsent(entry.getKey(), k -> new ArrayList<>()).add(future);
// }
// }
return result;
}
private void restoreState(CalculatedFieldId calculatedFieldId, EntityId entityId) {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId, entityId);
CalculatedFieldEntityCtx restoredCtx = stateService.restoreState(ctxId);
if (restoredCtx != null) {
states.put(ctxId, restoredCtx);
log.info("Restored state for CalculatedField [{}]", calculatedFieldId);
} else {
log.warn("No state found for CalculatedField [{}], entity [{}].", calculatedFieldId, entityId);
}
}
@Override
protected void cleanupEntityOnPartitionRemoval(CalculatedFieldId entityId) {
cleanupEntity(entityId);
@ -491,7 +495,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
} else {
List<CalculatedFieldEntityCtxId> ctxIds = tpiStates.computeIfAbsent(targetEntityTpi, k -> new ArrayList<>());
ctxIds.add(new CalculatedFieldEntityCtxId(ctx.getCfId(), targetEntity));
ctxIds.add(new CalculatedFieldEntityCtxId(ctx.getTenantId(), ctx.getCfId(), targetEntity));
}
}
@ -525,7 +529,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue())));
updateOrInitializeState(cfCtx, entityId, argumentValues, previousCalculatedFieldIds);
// updateOrInitializeState(cfCtx, entityId, argumentValues, previousCalculatedFieldIds);
}
@Override
@ -569,9 +573,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
private void clearState(CalculatedFieldId calculatedFieldId, EntityId entityId) {
log.warn("Executing clearState, calculatedFieldId=[{}], entityId=[{}]", calculatedFieldId, entityId);
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(calculatedFieldId, entityId);
states.remove(ctxId);
stateService.removeState(ctxId);
}
private void initializeStateForEntityByProfile(EntityId entityId, EntityId profileId, TbCallback callback) {
@ -601,7 +602,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(List<ArgumentEntry> results) {
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, new ArrayList<>());
// updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues, new ArrayList<>());
callback.onSuccess();
}
@ -613,96 +614,83 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}, calculatedFieldCallbackExecutor);
}
private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues, List<CalculatedFieldId> previousCalculatedFieldIds) {
CalculatedFieldId cfId = calculatedFieldCtx.getCfId();
Map<String, ArgumentEntry> argumentsMap = new HashMap<>(argumentValues);
// private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues, List<CalculatedFieldId> previousCalculatedFieldIds) {
// CalculatedFieldId cfId = calculatedFieldCtx.getCfId();
// Map<String, ArgumentEntry> argumentsMap = new HashMap<>(argumentValues);
//
// CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId, entityId);
//
// states.compute(entityCtxId, (ctxId, ctx) -> {
// CalculatedFieldEntityCtx calculatedFieldEntityCtx = ctx != null ? ctx : fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType());
//
// CompletableFuture<Void> updateFuture = new CompletableFuture<>();
//
// Consumer<CalculatedFieldState> performUpdateState = (state) -> {
// if (state.updateState(argumentsMap)) {
// calculatedFieldEntityCtx.setState(state);
// stateService.persistState(entityCtxId, calculatedFieldEntityCtx);
// Map<String, ArgumentEntry> arguments = state.getArguments();
// boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) &&
// !arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY);
// if (allArgsPresent) {
// performCalculation(calculatedFieldCtx, state, entityId, previousCalculatedFieldIds);
// }
// log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId);
// }
// updateFuture.complete(null);
// };
//
// CalculatedFieldState state = calculatedFieldEntityCtx.getState();
//
// boolean allKeysPresent = argumentsMap.keySet().containsAll(calculatedFieldCtx.getArguments().keySet());
// boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream()
// .anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getRefEntityKey().getType()) && state.getArguments().get(argument.getRefEntityKey().getKey()) == null);
//
// if (!allKeysPresent || requiresTsRollingUpdate) {
// Map<String, Argument> missingArguments = calculatedFieldCtx.getArguments().entrySet().stream()
// .filter(entry -> !argumentsMap.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getRefEntityKey().getType()) && state.getArguments().get(entry.getKey()) == null))
// .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
//
// fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentsMap::putAll)
// .addListener(() -> performUpdateState.accept(state),
// calculatedFieldCallbackExecutor);
// } else {
// performUpdateState.accept(state);
// }
//
// try {
// updateFuture.join();
// } catch (Exception e) {
// log.trace("Failed to update state for ctxId [{}].", ctxId, e);
// throw new RuntimeException("Failed to update or initialize state.", e);
// }
//
// return calculatedFieldEntityCtx;
// });
// }
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId, entityId);
states.compute(entityCtxId, (ctxId, ctx) -> {
CalculatedFieldEntityCtx calculatedFieldEntityCtx = ctx != null ? ctx : fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType());
CompletableFuture<Void> updateFuture = new CompletableFuture<>();
Consumer<CalculatedFieldState> performUpdateState = (state) -> {
if (state.updateState(argumentsMap)) {
calculatedFieldEntityCtx.setState(state);
stateService.persistState(entityCtxId, calculatedFieldEntityCtx);
Map<String, ArgumentEntry> arguments = state.getArguments();
boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) &&
!arguments.containsValue(SingleValueArgumentEntry.EMPTY) && !arguments.containsValue(TsRollingArgumentEntry.EMPTY);
if (allArgsPresent) {
performCalculation(calculatedFieldCtx, state, entityId, previousCalculatedFieldIds);
}
log.info("Successfully updated state: calculatedFieldId=[{}], entityId=[{}]", calculatedFieldCtx.getCfId(), entityId);
}
updateFuture.complete(null);
};
CalculatedFieldState state = calculatedFieldEntityCtx.getState();
boolean allKeysPresent = argumentsMap.keySet().containsAll(calculatedFieldCtx.getArguments().keySet());
boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream()
.anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getRefEntityKey().getType()) && state.getArguments().get(argument.getRefEntityKey().getKey()) == null);
if (!allKeysPresent || requiresTsRollingUpdate) {
Map<String, Argument> missingArguments = calculatedFieldCtx.getArguments().entrySet().stream()
.filter(entry -> !argumentsMap.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getRefEntityKey().getType()) && state.getArguments().get(entry.getKey()) == null))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentsMap::putAll)
.addListener(() -> performUpdateState.accept(state),
calculatedFieldCallbackExecutor);
} else {
performUpdateState.accept(state);
}
try {
updateFuture.join();
} catch (Exception e) {
log.trace("Failed to update state for ctxId [{}].", ctxId, e);
throw new RuntimeException("Failed to update or initialize state.", e);
}
return calculatedFieldEntityCtx;
});
}
private void performCalculation(CalculatedFieldCtx calculatedFieldCtx, CalculatedFieldState state, EntityId entityId, List<CalculatedFieldId> previousCalculatedFieldIds) {
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(calculatedFieldCtx);
Futures.addCallback(resultFuture, new FutureCallback<>() {
@Override
public void onSuccess(CalculatedFieldResult result) {
if (result != null) {
pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), calculatedFieldCtx.getCfId(), entityId, result, previousCalculatedFieldIds);
}
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t);
}
}, MoreExecutors.directExecutor());
}
private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> previousCalculatedFieldIds) {
@Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> cfIds, TbCallback callback) {
try {
OutputType type = calculatedFieldResult.getType();
TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST;
TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY;
ObjectNode payload = createJsonPayload(calculatedFieldResult);
if (previousCalculatedFieldIds != null && previousCalculatedFieldIds.contains(calculatedFieldId)) {
throw new IllegalArgumentException("Calculated field [" + calculatedFieldId.getId() + "] refers to itself, causing an infinite loop.");
}
List<CalculatedFieldId> calculatedFieldIds = previousCalculatedFieldIds != null
? new ArrayList<>(previousCalculatedFieldIds)
: new ArrayList<>();
calculatedFieldIds.add(calculatedFieldId);
TbMsg msg = TbMsg.newMsg().type(msgType).originator(originatorId).previousCalculatedFieldIds(calculatedFieldIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build();
clusterService.pushMsgToRuleEngine(tenantId, originatorId, msg, null);
log.info("Pushed message to rule engine: originatorId=[{}]", originatorId);
TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(JacksonUtil.writeValueAsString(payload)).build();
clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
callback.onSuccess();
log.trace("[{}][{}] Pushed message to rule engine: {} ", tenantId, entityId, msg);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
});
} catch (Exception e) {
log.warn("[{}] Failed to push message to rule engine. CalculatedFieldResult: {}", originatorId, calculatedFieldResult, e);
log.warn("[{}][{}] Failed to push message to rule engine. CalculatedFieldResult: {}", tenantId, entityId, calculatedFieldResult, e);
}
}
@ -781,15 +769,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return new StringDataEntry(key, defaultValue);
}
private CalculatedFieldEntityCtx fetchCalculatedFieldEntityState(CalculatedFieldEntityCtxId entityCtxId, CalculatedFieldType cfType) {
CalculatedFieldEntityCtx state = stateService.restoreState(entityCtxId);
if (state == null) {
return new CalculatedFieldEntityCtx(entityCtxId, createStateByType(cfType));
}
return state;
}
private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) {
ObjectNode payload = JacksonUtil.newObjectNode();
Map<String, Object> resultMap = calculatedFieldResult.getResultMap();

View File

@ -0,0 +1,64 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService;
@Service
@TbRuleEngineComponent
@RequiredArgsConstructor
public class DefaultCalculatedFieldInitService implements CalculatedFieldInitService {
private final CalculatedFieldService calculatedFieldService;
private final CalculatedFieldStateService stateService;
private final ActorSystemContext actorSystemContext;
@Value("${calculated_fields.init_fetch_pack_size:50000}")
@Getter
private int initFetchPackSize;
@AfterStartUp(order = AfterStartUp.CF_INIT_SERVICE)
public void initCalculatedFieldDefinitions() {
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
cfs.forEach(cf -> actorSystemContext.tell(new CalculatedFieldInitMsg(cf.getTenantId(), cf)));
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
cfls.forEach(link -> actorSystemContext.tell(new CalculatedFieldLinkInitMsg(link.getTenantId(), link)));
//TODO: combine with the DefaultCalculatedFieldCache.
}
@AfterStartUp(order = AfterStartUp.CF_STATE_RESTORE_SERVICE)
public void initCalculatedFieldStates() {
stateService.restoreStates().forEach((k, v) -> actorSystemContext.tell(new CalculatedFieldStateRestoreMsg(k, v)));
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.cf.ctx;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
public record CalculatedFieldEntityCtxId(CalculatedFieldId cfId, EntityId entityId) {
public record CalculatedFieldEntityCtxId(TenantId tenantId, CalculatedFieldId cfId, EntityId entityId) {
}

View File

@ -15,15 +15,18 @@
*/
package org.thingsboard.server.service.cf.ctx;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import java.util.Map;
public interface CalculatedFieldStateService {
Map<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> restoreStates();
Map<CalculatedFieldEntityCtxId, CalculatedFieldState> restoreStates();
CalculatedFieldEntityCtx restoreState(CalculatedFieldEntityCtxId ctxId);
CalculatedFieldState restoreState(CalculatedFieldEntityCtxId ctxId);
void persistState(CalculatedFieldEntityCtxId ctxId, CalculatedFieldEntityCtx state);
void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
void removeState(CalculatedFieldEntityCtxId ctxId);

View File

@ -66,4 +66,9 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
return stateUpdated;
}
@Override
public boolean isReady() {
//TODO: IM
return true;
}
}

View File

@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import java.util.ArrayList;
import java.util.HashMap;

View File

@ -44,4 +44,5 @@ public interface CalculatedFieldState {
ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx);
boolean isReady();
}

View File

@ -19,6 +19,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.RocksDBService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
@ -36,24 +37,25 @@ public class RocksDBStateService implements CalculatedFieldStateService {
private final RocksDBService rocksDBService;
@Override
public Map<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> restoreStates() {
public Map<CalculatedFieldEntityCtxId, CalculatedFieldState> restoreStates() {
return rocksDBService.getAll().entrySet().stream()
.collect(Collectors.toMap(
entry -> JacksonUtil.fromString(entry.getKey(), CalculatedFieldEntityCtxId.class),
entry -> JacksonUtil.fromString(entry.getValue(), CalculatedFieldEntityCtx.class)
entry -> JacksonUtil.fromString(entry.getValue(), CalculatedFieldState.class)
));
}
@Override
public CalculatedFieldEntityCtx restoreState(CalculatedFieldEntityCtxId ctxId) {
public CalculatedFieldState restoreState(CalculatedFieldEntityCtxId ctxId) {
return Optional.ofNullable(rocksDBService.get(JacksonUtil.writeValueAsString(ctxId)))
.map(storedState -> JacksonUtil.fromString(storedState, CalculatedFieldEntityCtx.class))
.map(storedState -> JacksonUtil.fromString(storedState, CalculatedFieldState.class))
.orElse(null);
}
@Override
public void persistState(CalculatedFieldEntityCtxId ctxId, CalculatedFieldEntityCtx state) {
rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(state));
public void persistState(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback){
rocksDBService.put(JacksonUtil.writeValueAsString(stateId), JacksonUtil.writeValueAsString(state));
callback.onSuccess();
}
@Override

View File

@ -21,6 +21,10 @@ import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
@Data
@NoArgsConstructor
@ -34,6 +38,18 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
private Long version;
public SingleValueArgumentEntry(TsKvProto entry) {
this.ts = entry.getTs();
this.version = entry.getVersion();
this.value = ProtoUtils.fromProto(entry).getValue();
}
public SingleValueArgumentEntry(AttributeValueProto entry) {
this.ts = entry.getLastUpdateTs();
this.version = entry.getVersion();
this.value = ProtoUtils.fromProto(entry).getValue();
}
public SingleValueArgumentEntry(KvEntry entry) {
if (entry instanceof TsKvEntry tsKvEntry) {
this.ts = tsKvEntry.getTs();

View File

@ -21,8 +21,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
@ -30,23 +28,20 @@ import org.springframework.stereotype.Service;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
@ -166,10 +161,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
pendingMsgHolder.setMsg(toCfMsg);
if (toCfMsg.hasTelemetryMsg()) {
log.trace("[{}] Forwarding regular telemetry message for processing {}", id, toCfMsg.getTelemetryMsg());
forwardToCalculatedFieldService(toCfMsg.getTelemetryMsg(), callback);
forwardToActorSystem(toCfMsg.getTelemetryMsg(), callback);
} else if (toCfMsg.hasLinkedTelemetryMsg()) {
log.trace("[{}] Forwarding linked telemetry message for processing {}", id, toCfMsg.getLinkedTelemetryMsg());
forwardToCalculatedFieldService(toCfMsg.getLinkedTelemetryMsg(), callback);
forwardToActorSystem(toCfMsg.getLinkedTelemetryMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
@ -219,9 +214,9 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
protected void handleNotification(UUID id, TbProtoQueueMsg<ToCalculatedFieldNotificationMsg> msg, TbCallback callback) {
ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue();
if (toCfNotification.hasComponentLifecycle()) {
forwardToCalculatedFieldService(toCfNotification.getComponentLifecycle(), callback);
forwardToActorSystem(toCfNotification.getComponentLifecycle(), callback);
} else if (toCfNotification.hasEntityUpdateMsg()) {
forwardToCalculatedFieldService(toCfNotification.getEntityUpdateMsg(), callback);
forwardToActorSystem(toCfNotification.getEntityUpdateMsg(), callback);
}
callback.onSuccess();
}
@ -249,33 +244,20 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
// }
//
private void forwardToCalculatedFieldService(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) {
private void forwardToActorSystem(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) {
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
actorContext.tell(new CalculatedFieldTelemetryMsg(tenantId, entityId, msg, callback));
}
private void forwardToActorSystem(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) {
var msg = linkedMsg.getMsg();
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onLinkedTelemetryMsg(linkedMsg, callback));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t);
callback.onFailure(t);
});
var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
actorContext.tell(new CalculatedFieldLinkedTelemetryMsg(tenantId, entityId, linkedMsg, callback));
}
private void forwardToCalculatedFieldService(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) {
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onTelemetryMsg(msg, callback));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToCalculatedFieldService(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) {
private void forwardToActorSystem(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) {
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldLifecycleMsg(msg, callback));
@ -287,7 +269,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
});
}
private void forwardToCalculatedFieldService(TransportProtos.CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) {
private void forwardToActorSystem(TransportProtos.CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) {
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityUpdateMsg(msg, callback));

View File

@ -147,7 +147,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
resultFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
}
DonAsynchron.withCallback(resultFuture, result -> {
calculatedFieldExecutionService.pushRequestToQueue(request, result);
calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback());
}, safeCallback(request.getCallback()), tsCallBackExecutor);
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
if (request.isSaveLatest() && !request.isOnlyLatest()) {
@ -167,7 +167,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
log.trace("Executing saveInternal [{}]", request);
ListenableFuture<List<Long>> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries());
DonAsynchron.withCallback(saveFuture, result -> {
calculatedFieldExecutionService.pushRequestToQueue(request, result);
calculatedFieldExecutionService.pushRequestToQueue(request, result, request.getCallback());
}, safeCallback(request.getCallback()), tsCallBackExecutor);
addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice()));
}

View File

@ -441,6 +441,8 @@ actors:
device_dispatcher_pool_size: "${ACTORS_SYSTEM_DEVICE_DISPATCHER_POOL_SIZE:4}" # Thread pool size for actor system dispatcher that process messages for device actors
rule_dispatcher_pool_size: "${ACTORS_SYSTEM_RULE_DISPATCHER_POOL_SIZE:8}" # Thread pool size for actor system dispatcher that process messages for rule engine (chain/node) actors
edge_dispatcher_pool_size: "${ACTORS_SYSTEM_EDGE_DISPATCHER_POOL_SIZE:4}" # Thread pool size for actor system dispatcher that process messages for edge actors
cfm_dispatcher_pool_size: "${ACTORS_SYSTEM_CFM_DISPATCHER_POOL_SIZE:2}" # Thread pool size for actor system dispatcher that process messages for CalculatedField manager actors
cfe_dispatcher_pool_size: "${ACTORS_SYSTEM_CFE_DISPATCHER_POOL_SIZE:8}" # Thread pool size for actor system dispatcher that process messages for CalculatedField entity actors
tenant:
create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}" # Create components in initialization
session:

View File

@ -35,6 +35,8 @@ sql.events.batch_threads=2
actors.system.tenant_dispatcher_pool_size=4
actors.system.device_dispatcher_pool_size=8
actors.system.rule_dispatcher_pool_size=12
actors.system.cfm_dispatcher_pool_size=2
actors.system.cfe_dispatcher_pool_size=2
transport.sessions.report_timeout=10000
queue.transport_api.request_poll_interval=5
queue.transport_api.response_poll_interval=5

View File

@ -0,0 +1,56 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
import lombok.Getter;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import java.util.Objects;
public class TbCalculatedFieldEntityActorId implements TbActorId {
@Getter
private final EntityId entityId;
public TbCalculatedFieldEntityActorId(EntityId entityId) {
this.entityId = entityId;
}
@Override
public String toString() {
return entityId.getEntityType() + "|" + entityId.getId();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TbCalculatedFieldEntityActorId that = (TbCalculatedFieldEntityActorId) o;
return entityId.equals(that.entityId);
}
@Override
public int hashCode() {
// Magic number to ensure that the hash does not match with the hash of other actor id - (TbEntityActorId)
return 42 + Objects.hash(entityId);
}
@Override
public EntityType getEntityType() {
return entityId.getEntityType();
}
}

View File

@ -27,4 +27,6 @@ public class ReferencedEntityKey {
private ArgumentType type;
private AttributeScope scope;
}

View File

@ -133,7 +133,16 @@ public enum MsgType {
* Messages that are sent to and from edge session to start edge synchronization process
*/
EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG,
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG,
CF_INIT_MSG, // Sent to init particular calculated field;
CF_LINK_INIT_MSG, // Sent to init particular calculated field;
CF_STATE_RESTORE_MSG,// Sent to init particular calculated field entity state;
CF_TELEMETRY_MSG,
CF_ENTITY_TELEMETRY_MSG,
CF_LINKED_TELEMETRY_MSG,
CF_UPDATE_MSG,
CF_ENTITY_UPDATE_MSG;
@Getter
private final boolean ignoreOnStart;

View File

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
public interface ToCalculatedFieldSystemMsg extends TenantAwareMsg {
default TbCallback getCallback() {
return TbCallback.EMPTY;
}
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.cf;
import lombok.Data;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
@Data
public class CalculatedFieldInitMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final CalculatedField cf;
@Override
public MsgType getMsgType() {
return MsgType.CF_INIT_MSG;
}
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.cf;
import lombok.Data;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
@Data
public class CalculatedFieldLinkInitMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final CalculatedFieldLink link;
@Override
public MsgType getMsgType() {
return MsgType.CF_LINK_INIT_MSG;
}
}

View File

@ -38,6 +38,10 @@ public @interface AfterStartUp {
int ACTOR_SYSTEM = 9;
int REGULAR_SERVICE = 10;
int CF_INIT_SERVICE = 10;
int CF_STATE_RESTORE_SERVICE = 11;
int CF_CONSUMER_SERVICE = 12;
int BEFORE_TRANSPORT_SERVICE = Integer.MAX_VALUE - 1001;
int TRANSPORT_SERVICE = Integer.MAX_VALUE - 1000;
int AFTER_TRANSPORT_SERVICE = Integer.MAX_VALUE - 999;

View File

@ -145,6 +145,8 @@ sql.events.batch_threads=2
actors.system.tenant_dispatcher_pool_size=4
actors.system.device_dispatcher_pool_size=8
actors.system.rule_dispatcher_pool_size=12
actors.system.cfm_dispatcher_pool_size=2
actors.system.cfe_dispatcher_pool_size=2
transport.sessions.report_timeout=10000
queue.transport_api.request_poll_interval=5
queue.transport_api.response_poll_interval=5