From 6ec4b8cf7253f85b11c3ec5376285f0281884b53 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 26 Sep 2023 13:17:52 +0300 Subject: [PATCH] TTL for queue stats and RE exceptions --- .../controller/TenantProfileController.java | 2 + .../DefaultRuleEngineStatisticsService.java | 28 +++--- .../controller/BaseQueueControllerTest.java | 95 +++++++++++++++++++ .../dao/usagerecord/ApiLimitService.java | 5 + .../DefaultTenantProfileConfiguration.java | 2 + .../usagerecord/DefaultApiLimitService.java | 34 +++++-- ...enant-profile-configuration.component.html | 28 ++++++ ...-tenant-profile-configuration.component.ts | 2 + ui-ngx/src/app/shared/models/tenant.model.ts | 4 + .../assets/locale/locale.constant-en_US.json | 6 ++ 10 files changed, 186 insertions(+), 20 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java b/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java index a7fe973ec0..6fbee00f7f 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TenantProfileController.java @@ -151,6 +151,8 @@ public class TenantProfileController extends BaseController { " \"defaultStorageTtlDays\": 0,\n" + " \"alarmsTtlDays\": 0,\n" + " \"rpcTtlDays\": 0,\n" + + " \"queueStatsTtlDays\": 0,\n" + + " \"ruleEngineExceptionsTtlDays\": 0,\n" + " \"warnThreshold\": 0\n" + " }\n" + " },\n" + diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java index 1b5bfd56d1..30ffb4c8ca 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.stats; import com.google.common.util.concurrent.FutureCallback; import lombok.Data; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.asset.Asset; @@ -26,7 +27,9 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; @@ -37,6 +40,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -44,9 +48,11 @@ import java.util.stream.Collectors; @TbRuleEngineComponent @Service @Slf4j +@RequiredArgsConstructor public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsService { public static final String TB_SERVICE_QUEUE = "TbServiceQueue"; + public static final String RULE_ENGINE_EXCEPTION = "ruleEngineException"; public static final FutureCallback CALLBACK = new FutureCallback() { @Override public void onSuccess(@Nullable Integer result) { @@ -61,16 +67,10 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS private final TbServiceInfoProvider serviceInfoProvider; private final TelemetrySubscriptionService tsService; - private final Lock lock = new ReentrantLock(); private final AssetService assetService; - private final ConcurrentMap tenantQueueAssets; - - public DefaultRuleEngineStatisticsService(TelemetrySubscriptionService tsService, TbServiceInfoProvider serviceInfoProvider, AssetService assetService) { - this.tsService = tsService; - this.serviceInfoProvider = serviceInfoProvider; - this.assetService = assetService; - this.tenantQueueAssets = new ConcurrentHashMap<>(); - } + private final ApiLimitService apiLimitService; + private final Lock lock = new ReentrantLock(); + private final ConcurrentMap tenantQueueAssets = new ConcurrentHashMap<>(); @Override public void reportQueueStats(long ts, TbRuleEngineConsumerStats ruleEngineStats) { @@ -84,7 +84,9 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS .map(kv -> new BasicTsKvEntry(ts, new LongDataEntry(kv.getKey(), (long) kv.getValue().get()))) .collect(Collectors.toList()); if (!tsList.isEmpty()) { - tsService.saveAndNotifyInternal(tenantId, serviceAssetId, tsList, CALLBACK); + long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getQueueStatsTtlDays); + ttl = TimeUnit.DAYS.toSeconds(ttl); + tsService.saveAndNotifyInternal(tenantId, serviceAssetId, tsList, ttl, CALLBACK); } } } catch (Exception e) { @@ -95,8 +97,10 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS }); ruleEngineStats.getTenantExceptions().forEach((tenantId, e) -> { try { - TsKvEntry tsKv = new BasicTsKvEntry(e.getTs(), new JsonDataEntry("ruleEngineException", e.toJsonString())); - tsService.saveAndNotifyInternal(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), CALLBACK); + TsKvEntry tsKv = new BasicTsKvEntry(e.getTs(), new JsonDataEntry(RULE_ENGINE_EXCEPTION, e.toJsonString())); + long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getRuleEngineExceptionsTtlDays); + ttl = TimeUnit.DAYS.toSeconds(ttl); + tsService.saveAndNotifyInternal(tenantId, getServiceAssetId(tenantId, queueName), Collections.singletonList(tsKv), ttl, CALLBACK); } catch (Exception e2) { if (!"Asset is referencing to non-existent tenant!".equalsIgnoreCase(e2.getMessage())) { log.debug("[{}] Failed to store the statistics", tenantId, e2); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java index 465ec37bb9..9da8c8f496 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseQueueControllerTest.java @@ -18,6 +18,12 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.core.type.TypeReference; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; @@ -26,13 +32,45 @@ import org.thingsboard.server.common.data.queue.ProcessingStrategyType; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategyType; +import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.dao.timeseries.TimeseriesDao; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; +import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; +import org.thingsboard.server.service.stats.DefaultRuleEngineStatisticsService; +import org.thingsboard.server.service.stats.RuleEngineStatisticsService; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.dao.asset.BaseAssetService.TB_SERVICE_QUEUE; @DaoSqlTest public class BaseQueueControllerTest extends AbstractControllerTest { + @Autowired + private RuleEngineStatisticsService ruleEngineStatisticsService; + @Autowired + private StatsFactory statsFactory; + @SpyBean + private TimeseriesDao timeseriesDao; + @Autowired + private AssetService assetService; + @Test public void testQueueWithServiceTypeRE() throws Exception { loginSysAdmin(); @@ -93,4 +131,61 @@ public class BaseQueueControllerTest extends AbstractControllerTest { .andExpect(status().isOk()); } + @Test + public void testQueueStatsTtl() throws ThingsboardException { + Queue queue = new Queue(); + queue.setName("Test-1"); + queue.setTenantId(TenantId.SYS_TENANT_ID); + + TbRuleEngineProcessingResult testProcessingResult = Mockito.mock(TbRuleEngineProcessingResult.class); + TbProtoQueueMsg msg = new TbProtoQueueMsg<>(UUID.randomUUID(), + TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .build()); + when(testProcessingResult.getSuccessMap()).thenReturn(Stream.generate(() -> msg) + .limit(5).collect(Collectors.toConcurrentMap(m -> UUID.randomUUID(), m -> m))); + when(testProcessingResult.getFailedMap()).thenReturn(Stream.generate(() -> msg) + .limit(5).collect(Collectors.toConcurrentMap(m -> UUID.randomUUID(), m -> m))); + when(testProcessingResult.getPendingMap()).thenReturn(new ConcurrentHashMap<>()); + RuleEngineException ruleEngineException = new RuleEngineException("Test Exception"); + when(testProcessingResult.getExceptionsMap()).thenReturn(new ConcurrentHashMap<>(Map.of( + tenantId, ruleEngineException + ))); + + TbRuleEngineConsumerStats testStats = new TbRuleEngineConsumerStats(queue, statsFactory); + testStats.log(testProcessingResult, true); + + int queueStatsTtlDays = 14; + int ruleEngineExceptionsTtlDays = 7; + updateDefaultTenantProfileConfig(profileConfiguration -> { + profileConfiguration.setQueueStatsTtlDays(queueStatsTtlDays); + profileConfiguration.setRuleEngineExceptionsTtlDays(ruleEngineExceptionsTtlDays); + }); + ruleEngineStatisticsService.reportQueueStats(System.currentTimeMillis(), testStats); + + Asset serviceAsset = assetService.findAssetsByTenantIdAndType(tenantId, TB_SERVICE_QUEUE, new PageLink(100)).getData() + .stream().filter(asset -> asset.getName().startsWith(queue.getName())) + .findFirst().get(); + + ArgumentCaptor ttlCaptor = ArgumentCaptor.forClass(Long.class); + verify(timeseriesDao).save(eq(tenantId), eq(serviceAsset.getId()), argThat(tsKvEntry -> { + return tsKvEntry.getKey().equals(TbRuleEngineConsumerStats.SUCCESSFUL_MSGS) && + tsKvEntry.getLongValue().get().equals(5L); + }), ttlCaptor.capture()); + verify(timeseriesDao).save(eq(tenantId), eq(serviceAsset.getId()), argThat(tsKvEntry -> { + return tsKvEntry.getKey().equals(TbRuleEngineConsumerStats.FAILED_MSGS) && + tsKvEntry.getLongValue().get().equals(5L); + }), ttlCaptor.capture()); + assertThat(ttlCaptor.getAllValues()).allSatisfy(usedTtl -> { + assertThat(usedTtl).isEqualTo(TimeUnit.DAYS.toSeconds(queueStatsTtlDays)); + }); + + verify(timeseriesDao).save(eq(tenantId), eq(serviceAsset.getId()), argThat(tsKvEntry -> { + return tsKvEntry.getKey().equals(DefaultRuleEngineStatisticsService.RULE_ENGINE_EXCEPTION) && + tsKvEntry.getJsonValue().get().equals(ruleEngineException.toJsonString()); + }), ttlCaptor.capture()); + assertThat(ttlCaptor.getValue()).isEqualTo(TimeUnit.DAYS.toSeconds(ruleEngineExceptionsTtlDays)); + } + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiLimitService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiLimitService.java index cd9524e1b1..69e366327d 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiLimitService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiLimitService.java @@ -17,9 +17,14 @@ package org.thingsboard.server.dao.usagerecord; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; + +import java.util.function.Function; public interface ApiLimitService { boolean checkEntitiesLimit(TenantId tenantId, EntityType entityType); + long getLimit(TenantId tenantId, Function extractor); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index bd2b80f10a..77a7fa8417 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -82,6 +82,8 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private int defaultStorageTtlDays; private int alarmsTtlDays; private int rpcTtlDays; + private int queueStatsTtlDays; + private int ruleEngineExceptionsTtlDays; private double warnThreshold; diff --git a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java index d5d19aa453..56bb096b5c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.usagerecord; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -27,6 +28,8 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; +import java.util.function.Function; + @Service @RequiredArgsConstructor public class DefaultApiLimitService implements ApiLimitService { @@ -36,16 +39,31 @@ public class DefaultApiLimitService implements ApiLimitService { @Override public boolean checkEntitiesLimit(TenantId tenantId, EntityType entityType) { - DefaultTenantProfileConfiguration profileConfiguration = tenantProfileCache.get(tenantId).getDefaultProfileConfiguration(); - long limit = profileConfiguration.getEntitiesLimit(entityType); - if (limit > 0) { - EntityTypeFilter filter = new EntityTypeFilter(); - filter.setEntityType(entityType); - long currentCount = entityService.countEntitiesByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), new EntityCountQuery(filter)); - return currentCount < limit; - } else { + long limit = getLimit(tenantId, profileConfiguration -> profileConfiguration.getEntitiesLimit(entityType)); + if (limit <= 0) { return true; } + + EntityTypeFilter filter = new EntityTypeFilter(); + filter.setEntityType(entityType); + long currentCount = entityService.countEntitiesByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), new EntityCountQuery(filter)); + return currentCount < limit; + } + + @Override + public long getLimit(TenantId tenantId, Function extractor) { + if (tenantId == null || tenantId.isSysTenantId()) { + return 0L; + } + TenantProfile tenantProfile = tenantProfileCache.get(tenantId); + if (tenantProfile == null) { + throw new IllegalArgumentException("Tenant profile not found for tenant " + tenantId); + } + Number value = extractor.apply(tenantProfile.getDefaultProfileConfiguration()); + if (value == null) { + return 0L; + } + return Math.max(0, value.longValue()); } } diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html index c2872d4145..2a4c59a7a2 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html +++ b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.html @@ -262,6 +262,34 @@ +
+ + tenant-profile.queue-stats-ttl-days + + + {{ 'tenant-profile.queue-stats-ttl-days-required' | translate}} + + + {{ 'tenant-profile.queue-stats-ttl-days-range' | translate}} + + + + + tenant-profile.rule-engine-exceptions-ttl-days + + + {{ 'tenant-profile.rule-engine-exceptions-ttl-days-required' | translate}} + + + {{ 'tenant-profile.rule-engine-exceptions-ttl-days-days-range' | translate}} + + + +
diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts index a4915f924b..19eb13540b 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts +++ b/ui-ngx/src/app/modules/home/components/profile/tenant/default-tenant-profile-configuration.component.ts @@ -90,6 +90,8 @@ export class DefaultTenantProfileConfigurationComponent implements ControlValueA defaultStorageTtlDays: [null, [Validators.required, Validators.min(0)]], alarmsTtlDays: [null, [Validators.required, Validators.min(0)]], rpcTtlDays: [null, [Validators.required, Validators.min(0)]], + queueStatsTtlDays: [null, [Validators.required, Validators.min(0)]], + ruleEngineExceptionsTtlDays: [null, [Validators.required, Validators.min(0)]], tenantServerRestLimitsConfiguration: [null, []], customerServerRestLimitsConfiguration: [null, []], maxWsSessionsPerTenant: [null, [Validators.min(0)]], diff --git a/ui-ngx/src/app/shared/models/tenant.model.ts b/ui-ngx/src/app/shared/models/tenant.model.ts index 9074d599f5..d781dad439 100644 --- a/ui-ngx/src/app/shared/models/tenant.model.ts +++ b/ui-ngx/src/app/shared/models/tenant.model.ts @@ -76,6 +76,8 @@ export interface DefaultTenantProfileConfiguration { defaultStorageTtlDays: number; alarmsTtlDays: number; rpcTtlDays: number; + queueStatsTtlDays: number; + ruleEngineExceptionsTtlDays: number; } export type TenantProfileConfigurations = DefaultTenantProfileConfiguration; @@ -124,6 +126,8 @@ export function createTenantProfileConfiguration(type: TenantProfileType): Tenan defaultStorageTtlDays: 0, alarmsTtlDays: 0, rpcTtlDays: 0, + queueStatsTtlDays: 0, + ruleEngineExceptionsTtlDays: 0 }; configuration = {...defaultConfiguration, type: TenantProfileType.DEFAULT}; break; diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index 0867f21114..876fb19afb 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -4003,6 +4003,12 @@ "rpc-ttl-days": "RPC TTL days", "rpc-ttl-days-required": "RPC TTL days required", "rpc-ttl-days-days-range": "RPC TTL days can't be negative", + "queue-stats-ttl-days": "Queue stats TTL days", + "queue-stats-ttl-days-required": "Queue stats TTL days required", + "queue-stats-ttl-days-range": "Queue stats TTL days can't be negative", + "rule-engine-exceptions-ttl-days": "Rule Engine exceptions TTL days", + "rule-engine-exceptions-ttl-days-required": "Rule Engine exceptions TTL days required", + "rule-engine-exceptions-ttl-days-range": "Rule Engine exceptions TTL days can't be negative", "max-rule-node-executions-per-message": "Rule node per message executions maximum number", "max-rule-node-executions-per-message-required": "MRule node per message executions maximum number is required.", "max-rule-node-executions-per-message-range": "Rule node per message executions maximum number can't be negative",