diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 50a777840c..de3864df9f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -644,6 +644,10 @@ public class ActorSystemContext { @Getter private String deviceStateNodeRateLimitConfig; + @Value("${actors.calculated_fields.calculation_timeout:5}") + @Getter + private long cfCalculationResultTimeout; + @Getter @Setter private TbActorSystem actorSystem; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index f7fc204c0f..e42ebb593e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -274,32 +274,30 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM private void processStateIfReady(CalculatedFieldCtx ctx, List cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) throws CalculatedFieldException { CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); - boolean stateSizeOk; - if (ctx.isInitialized() && state.isReady()) { - try { - CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); + boolean stateSizeChecked = false; + try { + if (ctx.isInitialized() && state.isReady()) { + CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(systemContext.getCfCalculationResultTimeout(), TimeUnit.SECONDS); state.checkStateSize(ctxId, ctx.getMaxStateSize()); - stateSizeOk = state.isSizeOk(); - if (stateSizeOk) { + stateSizeChecked = true; + if (state.isSizeOk()) { cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); } } - } catch (Exception e) { - throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build(); } - } else { - state.checkStateSize(ctxId, ctx.getMaxStateSize()); - stateSizeOk = state.isSizeOk(); - if (stateSizeOk) { - callback.onSuccess(); // State was updated but no calculation performed; + } catch (Exception e) { + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build(); + } finally { + if (!stateSizeChecked) { + state.checkStateSize(ctxId, ctx.getMaxStateSize()); + } + if (state.isSizeOk()) { + cfStateService.persistState(ctxId, state, callback); + } else { + removeStateAndRaiseSizeException(ctxId, CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(), callback); } - } - if (stateSizeOk) { - cfStateService.persistState(ctxId, state, callback); - } else { - removeStateAndRaiseSizeException(ctxId, CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(), callback); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index a064a99935..a237f3d022 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -100,10 +100,6 @@ public class SingleValueArgumentEntry implements ArgumentEntry { if (newVersion == null || this.version == null || newVersion > this.version) { this.ts = singleValueEntry.getTs(); this.version = newVersion; - BasicKvEntry newValue = singleValueEntry.getKvEntryValue(); - if (this.kvEntryValue != null && this.kvEntryValue.getValue().equals(newValue.getValue())) { - return false; - } this.kvEntryValue = singleValueEntry.getKvEntryValue(); return true; } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 78d7f01404..9e9ca42e83 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -436,7 +436,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc ctx.sendWsMsg(update); } else { ctx.doFetchAlarmCount(); - ctx.createAlarmSubscriptions(); + if (entitiesIds != null) { + ctx.createAlarmSubscriptions(); + } TbAlarmCountSubCtx finalCtx = ctx; ScheduledFuture task = scheduler.scheduleWithFixedDelay( () -> refreshDynamicQuery(finalCtx), diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index cbe7466a16..7e4e0708d8 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -512,6 +512,8 @@ actors: enabled: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}" # The value of DEBUG mode rate limit. By default, no more than 50 thousand events per hour configuration: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}" + # Time in seconds to receive calculation result. + calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}" debug: settings: diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index 69d50c61d1..9801907d3b 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -83,7 +83,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @Slf4j @DaoSqlTest @TestPropertySource(properties = { - "server.ws.alarms_per_alarm_status_subscription_cache_size=5" + "server.ws.alarms_per_alarm_status_subscription_cache_size=5", + "server.ws.dynamic_page_link.refresh_interval=15" }) public class WebsocketApiTest extends AbstractControllerTest { @Autowired diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntryTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntryTest.java index e83e30663d..2c48ed9167 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntryTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntryTest.java @@ -71,6 +71,6 @@ public class SingleValueArgumentEntryTest { @Test void testUpdateEntryWhenValueWasNotChanged() { - assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 11L), 237L))).isFalse(); + assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 11L), 364L))).isTrue(); } } \ No newline at end of file