diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldCtx.java index 0a7a5c8264..021a9bbc83 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldCtx.java @@ -26,6 +26,9 @@ public class CalculatedFieldCtx { private EntityId entityId; private CalculatedFieldState state; + public CalculatedFieldCtx() { + } + public CalculatedFieldCtx(CalculatedFieldId calculatedFieldId, EntityId entityId, CalculatedFieldState state) { this.calculatedFieldId = calculatedFieldId; this.entityId = entityId; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldState.java index dc07b820ea..f84f4a57c2 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/CalculatedFieldState.java @@ -24,6 +24,7 @@ import java.util.Map; @Builder public class CalculatedFieldState { + // TODO: use value object(TsKv) instead of string Map arguments; String result; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java index 06c87ff4d1..51ff540a64 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/cf/DefaultTbCalculatedFieldService.java @@ -67,7 +67,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -113,7 +112,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field")); calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool( Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback")); - scheduledExecutor.scheduleWithFixedDelay(this::fetchCalculatedFields, new Random().nextInt(defaultCalculatedFieldCheckIntervalInSec), defaultCalculatedFieldCheckIntervalInSec, TimeUnit.SECONDS); + scheduledExecutor.scheduleWithFixedDelay(this::fetchCalculatedFields, 0, defaultCalculatedFieldCheckIntervalInSec, TimeUnit.SECONDS); } @PreDestroy @@ -136,11 +135,15 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp try { TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB())); + log.info("Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId); if (proto.getDeleted()) { + log.warn("Executing onCalculatedFieldDelete, calculatedFieldId=[{}]", calculatedFieldId); onCalculatedFieldDelete(calculatedFieldId, callback); callback.onSuccess(); } if (proto.getUpdated()) { + log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId); + //TODO: improve the check. Maybe it was renamed or just the name of the result changed. onCalculatedFieldDelete(calculatedFieldId, callback); } CalculatedField cf = calculatedFieldService.findById(tenantId, calculatedFieldId); @@ -150,13 +153,18 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp calculatedFields.put(calculatedFieldId, cf); calculatedFieldLinks.put(calculatedFieldId, links); switch (entityId.getEntityType()) { - case ASSET, DEVICE -> initializeStateForEntity(tenantId, cf, entityId, callback); + case ASSET, DEVICE -> { + log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId); + initializeStateForEntity(tenantId, cf, entityId, callback); + } case ASSET_PROFILE -> { + log.info("Initializing state for all assets in profile: tenantId=[{}], assetProfileId=[{}]", tenantId, entityId); PageDataIterable assetIds = new PageDataIterable<>(pageLink -> assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize); assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, assetId, callback)); } case DEVICE_PROFILE -> { + log.info("Initializing state for all devices in profile: tenantId=[{}], deviceProfileId=[{}]", tenantId, entityId); PageDataIterable deviceIds = new PageDataIterable<>(pageLink -> deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize); deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, deviceId, callback)); @@ -164,12 +172,14 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp default -> throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); } + log.info("Successfully processed calculated field message for calculatedFieldId: [{}]", calculatedFieldId); } else { - //Calculated field or entity was probably deleted while message was in queue; + //Calculated field was probably deleted while message was in queue; + log.warn("Calculated field not found, possibly deleted: {}", calculatedFieldId); callback.onSuccess(); } } catch (Exception e) { - log.trace("Failed to process calculated field add msg: [{}]", proto, e); + log.trace("Failed to process calculated field msg: [{}]", proto, e); callback.onFailure(e); } } @@ -210,7 +220,6 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp } } - private void onCalculatedFieldDelete(CalculatedFieldId calculatedFieldId, TbCallback callback) { try { calculatedFieldLinks.remove(calculatedFieldId); @@ -221,7 +230,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp .collect(Collectors.toList()); rocksDBService.deleteAll(statesToRemove); } catch (Exception e) { - log.trace("Failed to delete calculated field.", e); + log.trace("Failed to delete calculated field: [{}]", calculatedFieldId, e); callback.onFailure(e); } } @@ -231,7 +240,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf)); PageDataIterable cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link)); - rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(ctxId, JacksonUtil.convertValue(ctx, CalculatedFieldCtx.class))); + rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(ctxId, JacksonUtil.fromString(ctx, CalculatedFieldCtx.class))); states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.startsWith(id.toString()))); } @@ -276,7 +285,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp @Override public void onFailure(Throwable t) { - log.warn("Failed to fetch data for type: {}", argument.getType(), t); + log.warn("Failed to initialize state for entity: [{}]", entityId, t); callback.onFailure(t); } }, calculatedFieldCallbackExecutor)); @@ -327,7 +336,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp } calculatedFieldCtx.setState(state); states.put(ctxId, calculatedFieldCtx); - rocksDBService.put(ctxId, Objects.requireNonNull(JacksonUtil.toString(calculatedFieldCtx))); + rocksDBService.put(ctxId, Objects.requireNonNull(JacksonUtil.writeValueAsString(calculatedFieldCtx))); } private String performCalculation(Map argumentValues, CalculatedFieldConfiguration calculatedFieldConfiguration) { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldConfiguration.java index cfae2e5a0e..7e6a77f006 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldConfiguration.java @@ -41,7 +41,6 @@ public interface CalculatedFieldConfiguration { Map getArguments(); - @JsonIgnore BaseCalculatedFieldConfiguration.Output getOutput(); @JsonIgnore diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldEntity.java index 725ddd7bef..efb262477b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldEntity.java @@ -56,7 +56,7 @@ public class CalculatedFieldEntity extends BaseSqlEntity implem private UUID tenantId; @Column(name = CALCULATED_FIELD_ENTITY_TYPE) - private EntityType entityType; + private String entityType; @Column(name = CALCULATED_FIELD_ENTITY_ID) private UUID entityId; @@ -88,12 +88,12 @@ public class CalculatedFieldEntity extends BaseSqlEntity implem this.setUuid(calculatedField.getUuidId()); this.createdTime = calculatedField.getCreatedTime(); this.tenantId = calculatedField.getTenantId().getId(); - this.entityType = calculatedField.getEntityId().getEntityType(); + this.entityType = calculatedField.getEntityId().getEntityType().name(); this.entityId = calculatedField.getEntityId().getId(); this.type = calculatedField.getType(); this.name = calculatedField.getName(); this.configurationVersion = calculatedField.getConfigurationVersion(); - this.configuration = calculatedField.getConfiguration().calculatedFieldConfigToJson(entityType, entityId); + this.configuration = calculatedField.getConfiguration().calculatedFieldConfigToJson(EntityType.valueOf(entityType), entityId); this.version = calculatedField.getVersion(); if (calculatedField.getExternalId() != null) { this.externalId = calculatedField.getExternalId().getId(); @@ -109,7 +109,7 @@ public class CalculatedFieldEntity extends BaseSqlEntity implem calculatedField.setType(type); calculatedField.setName(name); calculatedField.setConfigurationVersion(configurationVersion); - calculatedField.setConfiguration(readCalculatedFieldConfiguration(configuration, entityType, entityId)); + calculatedField.setConfiguration(readCalculatedFieldConfiguration(configuration, EntityType.valueOf(entityType), entityId)); calculatedField.setVersion(version); if (externalId != null) { calculatedField.setExternalId(new CalculatedFieldId(externalId)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldLinkEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldLinkEntity.java index 0ac5a9bc94..b4d6677be4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldLinkEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/CalculatedFieldLinkEntity.java @@ -53,7 +53,7 @@ public class CalculatedFieldLinkEntity extends BaseSqlEntity