Bug fixes for monolith

This commit is contained in:
Andrii Shvaika 2025-02-05 16:39:48 +02:00
parent 0d57d69964
commit 0332e0f0c9
10 changed files with 70 additions and 29 deletions

View File

@ -314,6 +314,7 @@ public class ActorSystemContext {
@Getter
private TbEntityViewService tbEntityViewService;
@Lazy
@Autowired
@Getter
private TelemetrySubscriptionService tsSubService;

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.actors.calculatedField;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.device.DeviceActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
@ -38,7 +39,7 @@ public class CalculatedFieldEntityActorCreator extends ContextBasedCreator {
@Override
public TbActorId createActorId() {
return new TbEntityActorId(entityId);
return new TbCalculatedFieldEntityActorId(entityId);
}
@Override

View File

@ -24,6 +24,7 @@ import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
@ -50,7 +51,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -175,7 +175,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private void onEntityCreated(ComponentLifecycleMsg msg, TbCallback callback) {
EntityId entityId = msg.getEntityId();
EntityId profileId = getProfileId(tenantId, entityId);
cfEntityCache.add(tenantId, entityId, profileId);
cfEntityCache.add(tenantId, profileId, entityId);
var entityIdFields = getCalculatedFieldsByEntityId(entityId);
var profileIdFields = getCalculatedFieldsByEntityId(profileId);
var fieldsCount = entityIdFields.size() + profileIdFields.size();
@ -233,6 +233,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
addLinks(cf);
initCf(cfCtx, callback, false);
}
}
@ -251,7 +252,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
} else {
var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
calculatedFields.put(newCf.getId(), newCfCtx);
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getId());
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
List<CalculatedFieldCtx> newCfList = new ArrayList<>(oldCfList.size());
boolean found = false;
for (CalculatedFieldCtx oldCtx : oldCfList) {
@ -265,10 +266,15 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
if (!found) {
newCfList.add(newCfCtx);
}
entityIdCalculatedFields.put(newCf.getId(), newCfList);
entityIdCalculatedFields.put(newCf.getEntityId(), newCfList);
deleteLinks(oldCfCtx);
addLinks(newCf);
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
if (newCfCtx.hasSignificantChanges(oldCfCtx)) {
var stateChanges = newCfCtx.hasStateChanges(oldCfCtx);
if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) {
try {
newCfCtx.init();
} catch (Exception e) {
@ -276,11 +282,12 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
systemContext.persistCalculatedFieldDebugEvent(newCf.getTenantId(), newCf.getId(), newCf.getEntityId(), null, null, null, null, e);
}
}
initCf(newCfCtx, callback, true);
initCf(newCfCtx, callback, stateChanges);
} else {
callback.onSuccess();
}
}
}
}
private void onCfDeleted(ComponentLifecycleMsg msg, TbCallback callback) {
@ -290,6 +297,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.warn("[{}] CF was already deleted [{}]", tenantId, cfId);
callback.onSuccess();
} else {
deleteLinks(cfCtx);
EntityId entityId = cfCtx.getEntityId();
EntityType entityType = cfCtx.getEntityId().getEntityType();
if (isProfileEntity(entityType)) {
@ -440,4 +449,16 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
() -> true);
}
private void addLinks(CalculatedField newCf) {
var newLinks = newCf.getConfiguration().buildCalculatedFieldLinks(tenantId, newCf.getEntityId(), newCf.getId());
newLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).add(link));
}
private void deleteLinks(CalculatedFieldCtx cfCtx) {
var oldCf = cfCtx.getCalculatedField();
var oldLinks = oldCf.getConfiguration().buildCalculatedFieldLinks(tenantId, oldCf.getEntityId(), oldCf.getId());
oldLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).remove(link));
}
}

View File

@ -70,20 +70,25 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
@AfterStartUp(order = AfterStartUp.CF_READ_CF_SERVICE)
public void init() {
//TODO: move to separate place to avoid circular references with the ActorSystemContext (@Lazy for tsSubService)
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf));
calculatedFields.values().forEach(cf ->
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf)
);
cfs.forEach(cf -> actorSystemContext.tell(new CalculatedFieldInitMsg(cf.getTenantId(), cf)));
cfs.forEach(cf -> {
calculatedFields.putIfAbsent(cf.getId(), cf);
actorSystemContext.tell(new CalculatedFieldInitMsg(cf.getTenantId(), cf));
});
calculatedFields.values().forEach(cf -> {
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf);
});
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new CopyOnWriteArrayList<>()).add(link));
cfls.forEach(link -> {
calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new CopyOnWriteArrayList<>()).add(link);
actorSystemContext.tell(new CalculatedFieldLinkInitMsg(link.getTenantId(), link));
});
calculatedFieldLinks.values().stream()
.flatMap(List::stream)
.forEach(link ->
entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link)
);
cfls.forEach(link -> actorSystemContext.tell(new CalculatedFieldLinkInitMsg(link.getTenantId(), link)));
}
@Override

View File

@ -136,7 +136,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
};
private final CalculatedFieldService calculatedFieldService;
private final TbAssetProfileCache assetProfileCache;
private final TbDeviceProfileCache deviceProfileCache;
private final CalculatedFieldCache calculatedFieldCache;

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.cf.cache;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import java.util.ArrayList;
@ -88,6 +89,9 @@ public class TenantEntityProfileCache {
public void add(EntityId profileId, EntityId entityId, Integer partition, boolean mine) {
lock.writeLock().lock();
try {
if(EntityType.DEVICE.equals(profileId.getEntityType())){
throw new RuntimeException("WTF?");
}
if (mine) {
myEntities.computeIfAbsent(profileId, k -> new HashSet<>()).add(entityId);
}

View File

@ -212,12 +212,16 @@ public class CalculatedFieldCtx {
return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId);
}
public boolean hasSignificantChanges(CalculatedFieldCtx other) {
boolean entityIdChanged = !entityId.equals(other.entityId);
public boolean hasOtherSignificantChanges(CalculatedFieldCtx other) {
boolean expressionChanged = !expression.equals(other.expression);
boolean outputChanged = !output.equals(other.output);
return expressionChanged || outputChanged;
}
public boolean hasStateChanges(CalculatedFieldCtx other) {
boolean typeChanged = !cfType.equals(other.cfType);
boolean argumentsChanged = !arguments.equals(other.arguments);
boolean expressionChanged = !expression.equals(other.expression);
return entityIdChanged || typeChanged || argumentsChanged || expressionChanged;
return typeChanged || argumentsChanged;
}
}

View File

@ -178,11 +178,11 @@ public class EntityStateSourcingListener {
}
case TENANT_PROFILE -> {
TenantProfile tenantProfile = (TenantProfile) event.getEntity();
tbClusterService.onTenantProfileDelete(tenantProfile, null);
tbClusterService.onTenantProfileDelete(tenantProfile, TbQueueCallback.EMPTY);
}
case DEVICE -> {
Device device = (Device) event.getEntity();
tbClusterService.onDeviceDeleted(tenantId, device, null);
tbClusterService.onDeviceDeleted(tenantId, device, TbQueueCallback.EMPTY);
}
case DEVICE_PROFILE -> {
DeviceProfile deviceProfile = (DeviceProfile) event.getEntity();
@ -190,11 +190,11 @@ public class EntityStateSourcingListener {
}
case TB_RESOURCE -> {
TbResourceInfo tbResource = (TbResourceInfo) event.getEntity();
tbClusterService.onResourceDeleted(tbResource, null);
tbClusterService.onResourceDeleted(tbResource, TbQueueCallback.EMPTY);
}
case CALCULATED_FIELD -> {
CalculatedField calculatedField = (CalculatedField) event.getEntity();
tbClusterService.onCalculatedFieldDeleted(calculatedField, null);
tbClusterService.onCalculatedFieldDeleted(calculatedField, TbQueueCallback.EMPTY);
}
default -> {
}

View File

@ -727,14 +727,19 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, TbQueueCallback callback) {
var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), callback);
var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), oldCalculatedField == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED));
onCalculatedFieldLifecycleMsg(msg, callback);
}
@Override
public void onCalculatedFieldDeleted(CalculatedField calculatedField, TbQueueCallback callback) {
var msg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED);
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(toProto(msg)).build(), callback);
var msg = toProto(new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED));
onCalculatedFieldLifecycleMsg(msg, callback);
}
private void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto msg, TbQueueCallback callback) {
broadcastToCalculatedFields(ToCalculatedFieldNotificationMsg.newBuilder().setComponentLifecycleMsg(msg).build(), callback);
broadcastToCore(ToCoreNotificationMsg.newBuilder().setComponentLifecycle(msg).build());
}
@Override

View File

@ -159,7 +159,8 @@ public final class TbActorMailbox implements TbActorCtx {
stopReason = TbActorStopReason.INIT_FAILED;
destroy(updateException.getCause());
} catch (Throwable t) {
log.debug("[{}] Failed to process message: {}", selfId, msg, t);
//TODO: revert;
log.error("[{}] Failed to process message: {}", selfId, msg, t);
ProcessFailureStrategy strategy = actor.onProcessFailure(msg, t);
if (strategy.isStop()) {
system.stop(selfId);