Merge branch 'master' into edqs

This commit is contained in:
Viacheslav Klimov 2025-03-04 12:59:42 +02:00 committed by GitHub
commit 9bbcef515d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 28 additions and 25 deletions

View File

@ -644,6 +644,10 @@ public class ActorSystemContext {
@Getter @Getter
private String deviceStateNodeRateLimitConfig; private String deviceStateNodeRateLimitConfig;
@Value("${actors.calculated_fields.calculation_timeout:5}")
@Getter
private long cfCalculationResultTimeout;
@Getter @Getter
@Setter @Setter
private TbActorSystem actorSystem; private TbActorSystem actorSystem;

View File

@ -274,34 +274,32 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) throws CalculatedFieldException { private void processStateIfReady(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) throws CalculatedFieldException {
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
boolean stateSizeOk; boolean stateSizeChecked = false;
if (ctx.isInitialized() && state.isReady()) {
try { try {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS); if (ctx.isInitialized() && state.isReady()) {
CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(systemContext.getCfCalculationResultTimeout(), TimeUnit.SECONDS);
state.checkStateSize(ctxId, ctx.getMaxStateSize()); state.checkStateSize(ctxId, ctx.getMaxStateSize());
stateSizeOk = state.isSizeOk(); stateSizeChecked = true;
if (stateSizeOk) { if (state.isSizeOk()) {
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
} }
} }
}
} catch (Exception e) { } catch (Exception e) {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build(); throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build();
} } finally {
} else { if (!stateSizeChecked) {
state.checkStateSize(ctxId, ctx.getMaxStateSize()); state.checkStateSize(ctxId, ctx.getMaxStateSize());
stateSizeOk = state.isSizeOk();
if (stateSizeOk) {
callback.onSuccess(); // State was updated but no calculation performed;
} }
} if (state.isSizeOk()) {
if (stateSizeOk) {
cfStateService.persistState(ctxId, state, callback); cfStateService.persistState(ctxId, state, callback);
} else { } else {
removeStateAndRaiseSizeException(ctxId, CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(), callback); removeStateAndRaiseSizeException(ctxId, CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(), callback);
} }
} }
}
private void removeStateAndRaiseSizeException(CalculatedFieldEntityCtxId ctxId, CalculatedFieldException ex, TbCallback callback) { private void removeStateAndRaiseSizeException(CalculatedFieldEntityCtxId ctxId, CalculatedFieldException ex, TbCallback callback) {
// We remove the state, but remember that it is over-sized in a local map. // We remove the state, but remember that it is over-sized in a local map.

View File

@ -100,10 +100,6 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
if (newVersion == null || this.version == null || newVersion > this.version) { if (newVersion == null || this.version == null || newVersion > this.version) {
this.ts = singleValueEntry.getTs(); this.ts = singleValueEntry.getTs();
this.version = newVersion; this.version = newVersion;
BasicKvEntry newValue = singleValueEntry.getKvEntryValue();
if (this.kvEntryValue != null && this.kvEntryValue.getValue().equals(newValue.getValue())) {
return false;
}
this.kvEntryValue = singleValueEntry.getKvEntryValue(); this.kvEntryValue = singleValueEntry.getKvEntryValue();
return true; return true;
} }

View File

@ -436,7 +436,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
ctx.sendWsMsg(update); ctx.sendWsMsg(update);
} else { } else {
ctx.doFetchAlarmCount(); ctx.doFetchAlarmCount();
if (entitiesIds != null) {
ctx.createAlarmSubscriptions(); ctx.createAlarmSubscriptions();
}
TbAlarmCountSubCtx finalCtx = ctx; TbAlarmCountSubCtx finalCtx = ctx;
ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay( ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay(
() -> refreshDynamicQuery(finalCtx), () -> refreshDynamicQuery(finalCtx),

View File

@ -512,6 +512,8 @@ actors:
enabled: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}" enabled: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_ENABLED:true}"
# The value of DEBUG mode rate limit. By default, no more than 50 thousand events per hour # The value of DEBUG mode rate limit. By default, no more than 50 thousand events per hour
configuration: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}" configuration: "${ACTORS_CALCULATED_FIELD_DEBUG_MODE_RATE_LIMITS_PER_TENANT_CONFIGURATION:50000:3600}"
# Time in seconds to receive calculation result.
calculation_timeout: "${ACTORS_CALCULATION_TIMEOUT_SEC:5}"
debug: debug:
settings: settings:

View File

@ -83,7 +83,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
@Slf4j @Slf4j
@DaoSqlTest @DaoSqlTest
@TestPropertySource(properties = { @TestPropertySource(properties = {
"server.ws.alarms_per_alarm_status_subscription_cache_size=5" "server.ws.alarms_per_alarm_status_subscription_cache_size=5",
"server.ws.dynamic_page_link.refresh_interval=15"
}) })
public class WebsocketApiTest extends AbstractControllerTest { public class WebsocketApiTest extends AbstractControllerTest {
@Autowired @Autowired

View File

@ -71,6 +71,6 @@ public class SingleValueArgumentEntryTest {
@Test @Test
void testUpdateEntryWhenValueWasNotChanged() { void testUpdateEntryWhenValueWasNotChanged() {
assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 11L), 237L))).isFalse(); assertThat(entry.updateEntry(new SingleValueArgumentEntry(ts + 18, new LongDataEntry("key", 11L), 364L))).isTrue();
} }
} }