From 5563da7c0d9e48fc707626e23db0082c6bf68faa Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 10 Feb 2025 16:28:59 +0200 Subject: [PATCH 1/7] implemented filters for AlarmCountQuery --- ...efaultTbEntityDataSubscriptionService.java | 8 +- .../subscription/TbAlarmCountSubCtx.java | 37 +++++- .../server/controller/WebsocketApiTest.java | 114 +++++++++++++++++- .../server/dao/alarm/AlarmService.java | 2 + .../common/data/query/AlarmCountQuery.java | 7 ++ .../common/data/query/EntityCountQuery.java | 2 +- .../server/dao/alarm/AlarmDao.java | 2 +- .../server/dao/alarm/BaseAlarmService.java | 8 +- .../server/dao/sql/alarm/JpaAlarmDao.java | 4 +- .../dao/sql/query/AlarmQueryRepository.java | 2 +- .../query/DefaultAlarmQueryRepository.java | 11 +- .../server/dao/service/AlarmServiceTest.java | 51 ++++++++ 12 files changed, 230 insertions(+), 18 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 946a425a71..9812872bf1 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -33,7 +33,6 @@ import org.springframework.stereotype.Service; import org.springframework.web.socket.CloseStatus; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; @@ -41,7 +40,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.ComparisonTsValue; -import org.thingsboard.server.common.data.query.OriginatorAlarmFilter; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityKey; @@ -55,7 +53,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.executors.DbCallbackExecutorService; -import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd; @@ -65,7 +62,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd; -import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate; @@ -74,7 +70,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.GetTsCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd; -import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate; import java.util.ArrayList; import java.util.Arrays; @@ -83,7 +78,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -555,7 +549,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private TbAlarmCountSubCtx createSubCtx(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) { Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap<>()); TbAlarmCountSubCtx ctx = new TbAlarmCountSubCtx(serviceId, wsService, entityService, localSubscriptionService, - attributesService, stats, alarmService, sessionRef, cmd.getCmdId()); + attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription); if (cmd.getQuery() != null) { ctx.setAndResolveQuery(cmd.getQuery()); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java index af7cfcfcfe..8425b2a57a 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java @@ -19,20 +19,33 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.AlarmCountQuery; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityDataPageLink; +import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.common.data.query.EntityDataSortOrder; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; +import java.util.List; + @Slf4j @ToString(callSuper = true) public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx { private final AlarmService alarmService; + private final int maxEntitiesPerAlarmSubscription; + @Getter @Setter private volatile int result; @@ -40,20 +53,21 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx data = entityService.findEntityDataByQuery(getTenantId(), getCustomerId(), buildEntityDataQuery()); + List entityIds = data.getData().stream().map(EntityData::getEntityId).toList(); + if (entityIds.isEmpty()) { + return 0; + } else { + return (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query, entityIds); + } + + } + + private EntityDataQuery buildEntityDataQuery() { + EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, + new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY))); + return new EntityDataQuery(query.getEntityFilter(), edpl, null, null, null); + } } diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index b64c87ccbd..e72e1dc806 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -83,7 +83,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @Slf4j @DaoSqlTest @TestPropertySource(properties = { - "server.ws.alarms_per_alarm_status_subscription_cache_size=5" + "server.ws.alarms_per_alarm_status_subscription_cache_size=5", + "server.ws.dynamic_page_link.refresh_interval=2" }) public class WebsocketApiTest extends AbstractControllerTest { @Autowired @@ -324,6 +325,117 @@ public class WebsocketApiTest extends AbstractControllerTest { Assert.assertEquals(1, update.getCount()); } + @Test + public void testAlarmCountWsCmdWithSingleEntityFilter() throws Exception { + loginTenantAdmin(); + + SingleEntityFilter singleEntityFilter = new SingleEntityFilter(); + singleEntityFilter.setSingleEntity(tenantId); + AlarmCountQuery alarmCountQuery = new AlarmCountQuery(singleEntityFilter); + AlarmCountCmd cmd1 = new AlarmCountCmd(1, alarmCountQuery); + + getWsClient().send(cmd1); + + AlarmCountUpdate update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(1, update.getCmdId()); + Assert.assertEquals(0, update.getCount()); + + Alarm alarm = new Alarm(); + alarm.setOriginator(tenantId); + alarm.setType("TEST ALARM"); + alarm.setSeverity(AlarmSeverity.WARNING); + + alarm = doPost("/api/alarm", alarm, Alarm.class); + + AlarmCountCmd cmd2 = new AlarmCountCmd(2, alarmCountQuery); + + getWsClient().send(cmd2); + + update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(2, update.getCmdId()); + Assert.assertEquals(1, update.getCount()); + + singleEntityFilter.setSingleEntity(tenantAdminUserId); + AlarmCountCmd cmd3 = new AlarmCountCmd(3, alarmCountQuery); + + getWsClient().send(cmd3); + + update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(3, update.getCmdId()); + Assert.assertEquals(0, update.getCount()); + + alarm.setAssigneeId(tenantAdminUserId); + alarm = doPost("/api/alarm", alarm, Alarm.class); + + singleEntityFilter.setSingleEntity(tenantId); + alarmCountQuery.setAssigneeId(tenantAdminUserId); + AlarmCountCmd cmd4 = new AlarmCountCmd(4, alarmCountQuery); + + getWsClient().send(cmd4); + + update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(4, update.getCmdId()); + Assert.assertEquals(1, update.getCount()); + + alarmCountQuery.setSeverityList(Collections.singletonList(AlarmSeverity.CRITICAL)); + AlarmCountCmd cmd5 = new AlarmCountCmd(5, alarmCountQuery); + + getWsClient().send(cmd5); + + update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(5, update.getCmdId()); + Assert.assertEquals(0, update.getCount()); + + alarm.setSeverity(AlarmSeverity.CRITICAL); + doPost("/api/alarm", alarm, Alarm.class); + + AlarmCountCmd cmd6 = new AlarmCountCmd(6, alarmCountQuery); + + getWsClient().send(cmd6); + + update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(6, update.getCmdId()); + Assert.assertEquals(1, update.getCount()); + } + + @Test + public void testAlarmCountWsCmdWithDeviceType() throws Exception { + loginTenantAdmin(); + + DeviceTypeFilter deviceTypeFilter = new DeviceTypeFilter(); + deviceTypeFilter.setDeviceTypes(List.of("default")); + AlarmCountQuery alarmCountQuery = new AlarmCountQuery(deviceTypeFilter); + AlarmCountCmd cmd1 = new AlarmCountCmd(1, alarmCountQuery); + + getWsClient().send(cmd1); + + AlarmCountUpdate update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(1, update.getCmdId()); + Assert.assertEquals(0, update.getCount()); + + getWsClient().registerWaitForUpdate(); + + Alarm alarm = new Alarm(); + alarm.setOriginator(device.getId()); + alarm.setType("TEST ALARM"); + alarm.setSeverity(AlarmSeverity.WARNING); + + alarm = doPost("/api/alarm", alarm, Alarm.class); + + update = getWsClient().parseAlarmCountReply(getWsClient().waitForUpdate(3000)); + Assert.assertEquals(1, update.getCmdId()); + Assert.assertEquals(1, update.getCount()); + + deviceTypeFilter.setDeviceTypes(List.of("non-existing")); + AlarmCountCmd cmd3 = new AlarmCountCmd(3, alarmCountQuery); + + getWsClient().send(cmd3); + + update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); + Assert.assertEquals(3, update.getCmdId()); + Assert.assertEquals(0, update.getCount()); + } + @Test public void testAlarmStatusWsCmd() throws Exception { loginTenantAdmin(); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java index ce040595cc..11252b59a5 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmService.java @@ -118,6 +118,8 @@ public interface AlarmService extends EntityDaoService { long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query); + long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection orderedEntityIds); + PageData findAlarmTypesByTenantId(TenantId tenantId, PageLink pageLink); List findActiveOriginatorAlarms(TenantId tenantId, OriginatorAlarmFilter originatorAlarmFilter, int limit); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java index 19cb445380..bb969a28ce 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java @@ -19,6 +19,7 @@ import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.ToString; import org.thingsboard.server.common.data.alarm.AlarmSearchStatus; import org.thingsboard.server.common.data.alarm.AlarmSeverity; @@ -30,6 +31,7 @@ import java.util.List; @NoArgsConstructor @AllArgsConstructor @Getter +@Setter @ToString public class AlarmCountQuery extends EntityCountQuery { private long startTs; @@ -40,4 +42,9 @@ public class AlarmCountQuery extends EntityCountQuery { private List severityList; private boolean searchPropagatedAlarms; private UserId assigneeId; + + public AlarmCountQuery(EntityFilter entityFilter) { + super(entityFilter); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java index 3dd24147e7..be9fa4c664 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java @@ -27,7 +27,7 @@ import java.util.List; public class EntityCountQuery { @Getter - private EntityFilter entityFilter; + protected EntityFilter entityFilter; @Getter protected List keyFilters; diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java index 4b4ee0ae4f..2432bb4bbe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDao.java @@ -106,7 +106,7 @@ public interface AlarmDao extends Dao { AlarmApiCallResult unassignAlarm(TenantId tenantId, AlarmId alarmId, long unassignTime); - long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query); + long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection orderedEntityIds); PageData findTenantAlarmTypes(UUID tenantId, PageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index bf08621d14..14cd65185d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -352,7 +352,13 @@ public class BaseAlarmService extends AbstractCachedEntityService INCORRECT_TENANT_ID + id); - return alarmDao.countAlarmsByQuery(tenantId, customerId, query); + return alarmDao.countAlarmsByQuery(tenantId, customerId, query, Collections.emptyList()); + } + + @Override + public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection orderedEntityIds) { + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + return alarmDao.countAlarmsByQuery(tenantId, customerId, query, orderedEntityIds); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java index e433dfbd86..a3e02d5461 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java @@ -415,8 +415,8 @@ public class JpaAlarmDao extends JpaAbstractDao implements A } @Override - public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query) { - return alarmQueryRepository.countAlarmsByQuery(tenantId, customerId, query); + public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection orderedEntityIds) { + return alarmQueryRepository.countAlarmsByQuery(tenantId, customerId, query, orderedEntityIds); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/AlarmQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/AlarmQueryRepository.java index 49aa6c7276..2a26d62376 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/AlarmQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/AlarmQueryRepository.java @@ -30,6 +30,6 @@ public interface AlarmQueryRepository { PageData findAlarmDataByQueryForEntities(TenantId tenantId, AlarmDataQuery query, Collection orderedEntityIds); - long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query); + long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection orderedEntityIds); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java index 05da22f9a5..b4cc40fc6d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.stream.Collectors; @Repository @@ -314,7 +315,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { } @Override - public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query) { + public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection orderedEntityIds) { QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, null, EntityType.ALARM)); if (query.isSearchPropagatedAlarms()) { @@ -326,6 +327,10 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { ctx.append(" and a.customer_id = :customerId and ea.customer_id = :customerId"); ctx.addUuidParameter("customerId", customerId.getId()); } + if (!orderedEntityIds.isEmpty()) { + ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); + ctx.append(" and ea.entity_id in (:entity_filter_entity_ids)"); + } } else { ctx.append("select count(id) from alarm_info a "); ctx.append("where a.tenant_id = :tenantId"); @@ -334,6 +339,10 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { ctx.append(" and a.customer_id = :customerId"); ctx.addUuidParameter("customerId", customerId.getId()); } + if (!orderedEntityIds.isEmpty()) { + ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); + ctx.append(" and a.originator_id in (:entity_filter_entity_ids)"); + } } long startTs; diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java index aa1c96d84b..d12765717a 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; @@ -48,6 +49,7 @@ import org.thingsboard.server.common.data.query.DeviceTypeFilter; import org.thingsboard.server.common.data.query.EntityDataSortOrder; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.EntityListFilter; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.security.Authority; @@ -936,4 +938,53 @@ public class AlarmServiceTest extends AbstractServiceTest { Assert.assertEquals(0, alarms.getData().size()); } + @Test + public void testCountAlarmsForEntities() throws ExecutionException, InterruptedException { + AssetId parentId = new AssetId(Uuids.timeBased()); + AssetId childId = new AssetId(Uuids.timeBased()); + + EntityRelation relation = new EntityRelation(parentId, childId, EntityRelation.CONTAINS_TYPE); + + Assert.assertTrue(relationService.saveRelationAsync(tenantId, relation).get()); + + long ts = System.currentTimeMillis(); + AlarmApiCallResult result = alarmService.createAlarm(AlarmCreateOrUpdateActiveRequest.builder() + .tenantId(tenantId) + .originator(childId) + .type(TEST_ALARM) + .severity(AlarmSeverity.CRITICAL) + .startTs(ts).build()); + AlarmInfo created = result.getAlarm(); + created.setPropagate(true); + result = alarmService.updateAlarm(AlarmUpdateRequest.fromAlarm(created)); + created = result.getAlarm(); + + EntityListFilter entityListFilter = new EntityListFilter(); + entityListFilter.setEntityList(List.of(childId.getId().toString(), parentId.getId().toString())); + entityListFilter.setEntityType(EntityType.ASSET); + AlarmCountQuery countQuery = new AlarmCountQuery(entityListFilter); + countQuery.setStartTs(0L); + countQuery.setEndTs(System.currentTimeMillis()); + + long alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(childId)); + Assert.assertEquals(1, alarmsCount); + + countQuery.setSearchPropagatedAlarms(true); + + alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(parentId)); + Assert.assertEquals(1, alarmsCount); + + created = alarmService.acknowledgeAlarm(tenantId, created.getId(), System.currentTimeMillis()).getAlarm(); + + countQuery.setStatusList(List.of(AlarmSearchStatus.UNACK)); + alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(childId)); + Assert.assertEquals(0, alarmsCount); + + alarmService.clearAlarm(tenantId, created.getId(), System.currentTimeMillis(), null); + + countQuery.setStatusList(List.of(AlarmSearchStatus.CLEARED)); + alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(childId)); + Assert.assertEquals(1, alarmsCount); + } + } From 33eac2f77868c4b79c1beb2a99a02fa0047f756c Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Mon, 10 Feb 2025 17:34:12 +0200 Subject: [PATCH 2/7] fixed tests --- .../service/subscription/TbAlarmCountSubCtx.java | 15 ++++++++------- .../server/common/data/query/AlarmCountQuery.java | 6 ++---- .../common/data/query/EntityCountQuery.java | 2 +- .../server/dao/alarm/BaseAlarmService.java | 3 +-- .../sql/query/DefaultAlarmQueryRepository.java | 4 ++-- 5 files changed, 14 insertions(+), 16 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java index 8425b2a57a..4771bad716 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java @@ -80,14 +80,15 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx data = entityService.findEntityDataByQuery(getTenantId(), getCustomerId(), buildEntityDataQuery()); - List entityIds = data.getData().stream().map(EntityData::getEntityId).toList(); - if (entityIds.isEmpty()) { - return 0; - } else { - return (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query, entityIds); + List entityIds = null; + if (query.getEntityFilter() != null) { + PageData data = entityService.findEntityDataByQuery(getTenantId(), getCustomerId(), buildEntityDataQuery()); + if (data.getData().isEmpty()) { + return 0; + } + entityIds = data.getData().stream().map(EntityData::getEntityId).toList(); } - + return (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query, entityIds); } private EntityDataQuery buildEntityDataQuery() { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java index bb969a28ce..1bf19a970f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/AlarmCountQuery.java @@ -17,9 +17,8 @@ package org.thingsboard.server.common.data.query; import lombok.AllArgsConstructor; import lombok.Builder; -import lombok.Getter; +import lombok.Data; import lombok.NoArgsConstructor; -import lombok.Setter; import lombok.ToString; import org.thingsboard.server.common.data.alarm.AlarmSearchStatus; import org.thingsboard.server.common.data.alarm.AlarmSeverity; @@ -30,8 +29,7 @@ import java.util.List; @Builder @NoArgsConstructor @AllArgsConstructor -@Getter -@Setter +@Data @ToString public class AlarmCountQuery extends EntityCountQuery { private long startTs; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java index be9fa4c664..3dd24147e7 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/EntityCountQuery.java @@ -27,7 +27,7 @@ import java.util.List; public class EntityCountQuery { @Getter - protected EntityFilter entityFilter; + private EntityFilter entityFilter; @Getter protected List keyFilters; diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index 14cd65185d..b8a57fdcff 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -351,8 +351,7 @@ public class BaseAlarmService extends AbstractCachedEntityService INCORRECT_TENANT_ID + id); - return alarmDao.countAlarmsByQuery(tenantId, customerId, query, Collections.emptyList()); + return countAlarmsByQuery(tenantId, customerId, query, null); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java index b4cc40fc6d..216fd0d2d4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java @@ -327,7 +327,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { ctx.append(" and a.customer_id = :customerId and ea.customer_id = :customerId"); ctx.addUuidParameter("customerId", customerId.getId()); } - if (!orderedEntityIds.isEmpty()) { + if (orderedEntityIds != null) { ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); ctx.append(" and ea.entity_id in (:entity_filter_entity_ids)"); } @@ -339,7 +339,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { ctx.append(" and a.customer_id = :customerId"); ctx.addUuidParameter("customerId", customerId.getId()); } - if (!orderedEntityIds.isEmpty()) { + if (orderedEntityIds != null) { ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); ctx.append(" and a.originator_id in (:entity_filter_entity_ids)"); } From 7b6bb8a6d61546a82305c51862ee087116b10827 Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Wed, 12 Feb 2025 12:34:56 +0200 Subject: [PATCH 3/7] UI: Add support for entity alias filtering in alarm count data sources --- .../server/service/subscription/TbAlarmCountSubCtx.java | 2 +- .../basic/alarm/alarm-count-basic-config.component.html | 9 +++++++++ .../basic/alarm/alarm-count-basic-config.component.ts | 2 ++ .../components/widget/config/datasource.component.html | 8 ++++---- .../components/widget/config/datasource.component.ts | 9 +++++++++ .../components/widget/config/datasources.component.ts | 4 ++++ 6 files changed, 29 insertions(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java index 4771bad716..2a5dea70ac 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java @@ -94,6 +94,6 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx + +
alarm.filter
diff --git a/ui-ngx/src/app/modules/home/components/widget/config/basic/alarm/alarm-count-basic-config.component.ts b/ui-ngx/src/app/modules/home/components/widget/config/basic/alarm/alarm-count-basic-config.component.ts index 4ad664fae3..d365ed3f2d 100644 --- a/ui-ngx/src/app/modules/home/components/widget/config/basic/alarm/alarm-count-basic-config.component.ts +++ b/ui-ngx/src/app/modules/home/components/widget/config/basic/alarm/alarm-count-basic-config.component.ts @@ -69,6 +69,7 @@ export class AlarmCountBasicConfigComponent extends BasicWidgetConfigComponent { const settings: CountWidgetSettings = {...countDefaultSettings(true), ...(configData.config.settings || {})}; this.alarmCountWidgetConfigForm = this.fb.group({ alarmFilterConfig: [getAlarmFilterConfig(configData.config.datasources), []], + datasources: [configData.config.datasources, []], settings: [settings, []], @@ -81,6 +82,7 @@ export class AlarmCountBasicConfigComponent extends BasicWidgetConfigComponent { } protected prepareOutputConfig(config: any): WidgetConfigComponentData { + this.widgetConfig.config.datasources = config.datasources; setAlarmFilterConfig(config.alarmFilterConfig, this.widgetConfig.config.datasources); this.widgetConfig.config.settings = {...(this.widgetConfig.config.settings || {}), ...config.settings}; diff --git a/ui-ngx/src/app/modules/home/components/widget/config/datasource.component.html b/ui-ngx/src/app/modules/home/components/widget/config/datasource.component.html index 8af7753525..6c5c5cf083 100644 --- a/ui-ngx/src/app/modules/home/components/widget/config/datasource.component.html +++ b/ui-ngx/src/app/modules/home/components/widget/config/datasource.component.html @@ -36,7 +36,7 @@ datasourceFormGroup.get('type').value === datasourceType.entity || datasourceFormGroup.get('type').value === datasourceType.entityCount || datasourceFormGroup.get('type').value === datasourceType.alarmCount ? datasourceFormGroup.get('type').value : ''"> - @@ -98,7 +98,7 @@ Date: Wed, 12 Feb 2025 15:38:25 +0200 Subject: [PATCH 4/7] fetch entities only on subs creation --- ...efaultTbEntityDataSubscriptionService.java | 10 +- .../subscription/TbAlarmCountSubCtx.java | 106 ++++++++++++++---- .../server/controller/WebsocketApiTest.java | 4 +- .../query/DefaultAlarmQueryRepository.java | 36 +++--- 4 files changed, 117 insertions(+), 39 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 9812872bf1..b09f8350f1 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -424,8 +424,12 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc long start = System.currentTimeMillis(); ctx.fetchData(); long end = System.currentTimeMillis(); - stats.getAlarmQueryInvocationCnt().incrementAndGet(); - stats.getAlarmQueryTimeSpent().addAndGet(end - start); + stats.getRegularQueryInvocationCnt().incrementAndGet(); + stats.getRegularQueryTimeSpent().addAndGet(end - start); + ctx.cancelTasks(); + ctx.clearAlarmSubscriptions(); + ctx.fetchAlarmCount(); + ctx.createAlarmSubscriptions(); TbAlarmCountSubCtx finalCtx = ctx; ScheduledFuture task = scheduler.scheduleWithFixedDelay( () -> refreshDynamicQuery(finalCtx), @@ -549,7 +553,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc private TbAlarmCountSubCtx createSubCtx(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) { Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap<>()); TbAlarmCountSubCtx ctx = new TbAlarmCountSubCtx(serviceId, wsService, entityService, localSubscriptionService, - attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription); + attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription, maxAlarmQueriesPerRefreshInterval); if (cmd.getQuery() != null) { ctx.setAndResolveQuery(cmd.getQuery()); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java index 2a5dea70ac..712fc5c378 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java @@ -36,7 +36,9 @@ import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; -import java.util.List; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @Slf4j @ToString(callSuper = true) @@ -44,56 +46,120 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx subToEntityIdMap; + + private LinkedHashSet entitiesIds; + private final int maxEntitiesPerAlarmSubscription; + private final int maxAlarmQueriesPerRefreshInterval; + @Getter @Setter private volatile int result; + @Getter + @Setter + private boolean tooManyEntities; + + private int alarmCountInvocationAttempts; + public TbAlarmCountSubCtx(String serviceId, WebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, SubscriptionServiceStatistics stats, AlarmService alarmService, - WebSocketSessionRef sessionRef, int cmdId, int maxEntitiesPerAlarmSubscription) { + WebSocketSessionRef sessionRef, int cmdId, int maxEntitiesPerAlarmSubscription, int maxAlarmQueriesPerRefreshInterval) { super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId); this.alarmService = alarmService; + this.subToEntityIdMap = new ConcurrentHashMap<>(); this.maxEntitiesPerAlarmSubscription = maxEntitiesPerAlarmSubscription; + this.maxAlarmQueriesPerRefreshInterval = maxAlarmQueriesPerRefreshInterval; + this.entitiesIds = null; + } + + @Override + public void clearSubscriptions() { + clearAlarmSubscriptions(); } @Override public void fetchData() { - result = countAlarms(); - sendWsMsg(new AlarmCountUpdate(cmdId, result)); + resetInvocationCounter(); + if (query.getEntityFilter() != null) { + entitiesIds = new LinkedHashSet<>(); + log.trace("[{}] Fetching data: {}", cmdId, alarmCountInvocationAttempts); + PageData data = entityService.findEntityDataByQuery(getTenantId(), getCustomerId(), buildEntityDataQuery()); + entitiesIds.clear(); + tooManyEntities = data.hasNext(); + for (EntityData entityData : data.getData()) { + entitiesIds.add(entityData.getEntityId()); + } + } } @Override protected void update() { - int newCount = countAlarms(); - if (newCount != result) { - result = newCount; - sendWsMsg(new AlarmCountUpdate(cmdId, result)); + resetInvocationCounter(); + fetchAlarmCount(); + } + + public void fetchAlarmCount() { + alarmCountInvocationAttempts++; + log.trace("[{}] Fetching alarms: {}", cmdId, alarmCountInvocationAttempts); + if (alarmCountInvocationAttempts <= maxAlarmQueriesPerRefreshInterval) { + doFetchAlarmCount(); + } else { + log.trace("[{}] Ignore alarm count fetch due to rate limit: [{}] of maximum [{}]", cmdId, alarmCountInvocationAttempts, maxAlarmQueriesPerRefreshInterval); } } + private void doFetchAlarmCount() { + result = (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query, entitiesIds); + sendWsMsg(new AlarmCountUpdate(cmdId, result)); + } + @Override public boolean isDynamic() { return true; } - private int countAlarms() { - List entityIds = null; - if (query.getEntityFilter() != null) { - PageData data = entityService.findEntityDataByQuery(getTenantId(), getCustomerId(), buildEntityDataQuery()); - if (data.getData().isEmpty()) { - return 0; - } - entityIds = data.getData().stream().map(EntityData::getEntityId).toList(); - } - return (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query, entityIds); - } - private EntityDataQuery buildEntityDataQuery() { EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY))); return new EntityDataQuery(query.getEntityFilter(), edpl, null, null, query.getKeyFilters()); } + + private void resetInvocationCounter() { + alarmCountInvocationAttempts = 0; + } + + public void createAlarmSubscriptions() { + for (EntityId entityId : entitiesIds) { + createAlarmSubscriptionForEntity(entityId); + } + } + + private void createAlarmSubscriptionForEntity(EntityId entityId) { + int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); + subToEntityIdMap.put(subIdx, entityId); + log.trace("[{}][{}][{}] Creating alarms subscription for [{}] ", serviceId, cmdId, subIdx, entityId); + TbAlarmsSubscription subscription = TbAlarmsSubscription.builder() + .serviceId(serviceId) + .sessionId(sessionRef.getSessionId()) + .subscriptionId(subIdx) + .tenantId(sessionRef.getSecurityCtx().getTenantId()) + .entityId(entityId) + .updateProcessor((sub, update) -> fetchAlarmCount()) + .build(); + localSubscriptionService.addSubscription(subscription, sessionRef); + } + + public void clearAlarmSubscriptions() { + if (subToEntityIdMap != null) { + for (Integer subId : subToEntityIdMap.keySet()) { + localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId); + } + subToEntityIdMap.clear(); + } + } + } diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index e72e1dc806..d1b8bc0430 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -84,7 +84,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @DaoSqlTest @TestPropertySource(properties = { "server.ws.alarms_per_alarm_status_subscription_cache_size=5", - "server.ws.dynamic_page_link.refresh_interval=2" + "server.ws.dynamic_page_link.refresh_interval=3" }) public class WebsocketApiTest extends AbstractControllerTest { @Autowired @@ -422,7 +422,7 @@ public class WebsocketApiTest extends AbstractControllerTest { alarm = doPost("/api/alarm", alarm, Alarm.class); - update = getWsClient().parseAlarmCountReply(getWsClient().waitForUpdate(3000)); + update = getWsClient().parseAlarmCountReply(getWsClient().waitForUpdate(4000)); Assert.assertEquals(1, update.getCmdId()); Assert.assertEquals(1, update.getCount()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java index 216fd0d2d4..db8c8ea552 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultAlarmQueryRepository.java @@ -321,27 +321,35 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository { if (query.isSearchPropagatedAlarms()) { ctx.append("select count(distinct(a.id)) from alarm_info a "); ctx.append(JOIN_ENTITY_ALARMS); - ctx.append("where a.tenant_id = :tenantId and ea.tenant_id = :tenantId"); - ctx.addUuidParameter("tenantId", tenantId.getId()); - if (customerId != null && !customerId.isNullUid()) { - ctx.append(" and a.customer_id = :customerId and ea.customer_id = :customerId"); - ctx.addUuidParameter("customerId", customerId.getId()); - } if (orderedEntityIds != null) { + if (orderedEntityIds.isEmpty()) { + return 0; + } ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); - ctx.append(" and ea.entity_id in (:entity_filter_entity_ids)"); + ctx.append("where ea.entity_id in (:entity_filter_entity_ids)"); + } else { + ctx.append("where a.tenant_id = :tenantId and ea.tenant_id = :tenantId"); + ctx.addUuidParameter("tenantId", tenantId.getId()); + if (customerId != null && !customerId.isNullUid()) { + ctx.append(" and a.customer_id = :customerId and ea.customer_id = :customerId"); + ctx.addUuidParameter("customerId", customerId.getId()); + } } } else { ctx.append("select count(id) from alarm_info a "); - ctx.append("where a.tenant_id = :tenantId"); - ctx.addUuidParameter("tenantId", tenantId.getId()); - if (customerId != null && !customerId.isNullUid()) { - ctx.append(" and a.customer_id = :customerId"); - ctx.addUuidParameter("customerId", customerId.getId()); - } if (orderedEntityIds != null) { + if (orderedEntityIds.isEmpty()) { + return 0; + } ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList())); - ctx.append(" and a.originator_id in (:entity_filter_entity_ids)"); + ctx.append("where a.originator_id in (:entity_filter_entity_ids)"); + } else { + ctx.append("where a.tenant_id = :tenantId"); + ctx.addUuidParameter("tenantId", tenantId.getId()); + if (customerId != null && !customerId.isNullUid()) { + ctx.append(" and a.customer_id = :customerId"); + ctx.addUuidParameter("customerId", customerId.getId()); + } } } From ca1185de541197d0276c5478f3abc59743aa32ae Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 12 Feb 2025 17:02:49 +0200 Subject: [PATCH 5/7] do not send update if count was not changed --- ...efaultTbEntityDataSubscriptionService.java | 22 +++++++++++++------ .../subscription/TbAlarmCountSubCtx.java | 19 ++++++++++------ 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index b09f8350f1..968711fa55 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -33,6 +33,7 @@ import org.springframework.stereotype.Service; import org.springframework.web.socket.CloseStatus; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult; @@ -59,6 +60,7 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggKey; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggTimeSeriesCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd; +import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataUpdate; import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd; @@ -426,15 +428,21 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc long end = System.currentTimeMillis(); stats.getRegularQueryInvocationCnt().incrementAndGet(); stats.getRegularQueryTimeSpent().addAndGet(end - start); + Set entitiesIds = ctx.getEntitiesIds(); ctx.cancelTasks(); ctx.clearAlarmSubscriptions(); - ctx.fetchAlarmCount(); - ctx.createAlarmSubscriptions(); - TbAlarmCountSubCtx finalCtx = ctx; - ScheduledFuture task = scheduler.scheduleWithFixedDelay( - () -> refreshDynamicQuery(finalCtx), - dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); - finalCtx.setRefreshTask(task); + if (entitiesIds != null && entitiesIds.isEmpty()) { + AlarmCountUpdate update = new AlarmCountUpdate(cmd.getCmdId(), 0); + ctx.sendWsMsg(update); + } else { + ctx.doFetchAlarmCount(); + ctx.createAlarmSubscriptions(); + TbAlarmCountSubCtx finalCtx = ctx; + ScheduledFuture task = scheduler.scheduleWithFixedDelay( + () -> refreshDynamicQuery(finalCtx), + dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); + finalCtx.setRefreshTask(task); + } } else { log.debug("[{}][{}] Received duplicate command: {}", session.getSessionId(), cmd.getCmdId(), cmd); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java index 712fc5c378..d0144b8787 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmCountSubCtx.java @@ -48,6 +48,7 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx subToEntityIdMap; + @Getter private LinkedHashSet entitiesIds; private final int maxEntitiesPerAlarmSubscription; @@ -102,26 +103,30 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx Date: Thu, 13 Feb 2025 11:28:42 +0200 Subject: [PATCH 6/7] fixed flaky tests --- .../server/controller/WebsocketApiTest.java | 45 +++---------------- 1 file changed, 7 insertions(+), 38 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index d1b8bc0430..f8a9db43d0 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -83,8 +83,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @Slf4j @DaoSqlTest @TestPropertySource(properties = { - "server.ws.alarms_per_alarm_status_subscription_cache_size=5", - "server.ws.dynamic_page_link.refresh_interval=3" + "server.ws.alarms_per_alarm_status_subscription_cache_size=5" }) public class WebsocketApiTest extends AbstractControllerTest { @Autowired @@ -340,6 +339,7 @@ public class WebsocketApiTest extends AbstractControllerTest { Assert.assertEquals(1, update.getCmdId()); Assert.assertEquals(0, update.getCount()); + //create alarm, check count = 1 Alarm alarm = new Alarm(); alarm.setOriginator(tenantId); alarm.setType("TEST ALARM"); @@ -355,6 +355,7 @@ public class WebsocketApiTest extends AbstractControllerTest { Assert.assertEquals(2, update.getCmdId()); Assert.assertEquals(1, update.getCount()); + // set wrong entity id in filter, check count = 0 singleEntityFilter.setSingleEntity(tenantAdminUserId); AlarmCountCmd cmd3 = new AlarmCountCmd(3, alarmCountQuery); @@ -363,39 +364,6 @@ public class WebsocketApiTest extends AbstractControllerTest { update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); Assert.assertEquals(3, update.getCmdId()); Assert.assertEquals(0, update.getCount()); - - alarm.setAssigneeId(tenantAdminUserId); - alarm = doPost("/api/alarm", alarm, Alarm.class); - - singleEntityFilter.setSingleEntity(tenantId); - alarmCountQuery.setAssigneeId(tenantAdminUserId); - AlarmCountCmd cmd4 = new AlarmCountCmd(4, alarmCountQuery); - - getWsClient().send(cmd4); - - update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); - Assert.assertEquals(4, update.getCmdId()); - Assert.assertEquals(1, update.getCount()); - - alarmCountQuery.setSeverityList(Collections.singletonList(AlarmSeverity.CRITICAL)); - AlarmCountCmd cmd5 = new AlarmCountCmd(5, alarmCountQuery); - - getWsClient().send(cmd5); - - update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); - Assert.assertEquals(5, update.getCmdId()); - Assert.assertEquals(0, update.getCount()); - - alarm.setSeverity(AlarmSeverity.CRITICAL); - doPost("/api/alarm", alarm, Alarm.class); - - AlarmCountCmd cmd6 = new AlarmCountCmd(6, alarmCountQuery); - - getWsClient().send(cmd6); - - update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); - Assert.assertEquals(6, update.getCmdId()); - Assert.assertEquals(1, update.getCount()); } @Test @@ -422,7 +390,7 @@ public class WebsocketApiTest extends AbstractControllerTest { alarm = doPost("/api/alarm", alarm, Alarm.class); - update = getWsClient().parseAlarmCountReply(getWsClient().waitForUpdate(4000)); + update = getWsClient().parseAlarmCountReply(getWsClient().waitForUpdate()); Assert.assertEquals(1, update.getCmdId()); Assert.assertEquals(1, update.getCount()); @@ -484,17 +452,18 @@ public class WebsocketApiTest extends AbstractControllerTest { doPost("/api/alarm", alarm2, Alarm.class); - AlarmStatusUpdate alarmStatusUpdate3 = JacksonUtil.fromString(getWsClient().waitForReply(), AlarmStatusUpdate.class); + AlarmStatusUpdate alarmStatusUpdate3 = JacksonUtil.fromString(getWsClient().waitForUpdate(), AlarmStatusUpdate.class); Assert.assertEquals(1, alarmStatusUpdate3.getCmdId()); Assert.assertTrue(alarmStatusUpdate3.isActive()); //change severity + getWsClient().registerWaitForUpdate(); alarm2.setSeverity(AlarmSeverity.MAJOR); Alarm updatedAlarm = doPost("/api/alarm", alarm2, Alarm.class); Assert.assertNotNull(updatedAlarm); Assert.assertEquals(AlarmSeverity.MAJOR, updatedAlarm.getSeverity()); - AlarmStatusUpdate alarmStatusUpdate4 = JacksonUtil.fromString(getWsClient().waitForReply(), AlarmStatusUpdate.class); + AlarmStatusUpdate alarmStatusUpdate4 = JacksonUtil.fromString(getWsClient().waitForUpdate(), AlarmStatusUpdate.class); Assert.assertEquals(1, alarmStatusUpdate4.getCmdId()); Assert.assertFalse(alarmStatusUpdate4.isActive()); From 1f3c7acc51a8b9d979150e40a76f10b85b6365f7 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Fri, 14 Feb 2025 19:01:58 +0200 Subject: [PATCH 7/7] fixed flaky test --- .../server/controller/WebsocketApiTest.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index f8a9db43d0..51c9b08155 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -340,29 +340,26 @@ public class WebsocketApiTest extends AbstractControllerTest { Assert.assertEquals(0, update.getCount()); //create alarm, check count = 1 + getWsClient().registerWaitForUpdate(); + Alarm alarm = new Alarm(); alarm.setOriginator(tenantId); alarm.setType("TEST ALARM"); alarm.setSeverity(AlarmSeverity.WARNING); - alarm = doPost("/api/alarm", alarm, Alarm.class); - AlarmCountCmd cmd2 = new AlarmCountCmd(2, alarmCountQuery); - - getWsClient().send(cmd2); - - update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); - Assert.assertEquals(2, update.getCmdId()); + update = getWsClient().parseAlarmCountReply(getWsClient().waitForUpdate()); + Assert.assertEquals(1, update.getCmdId()); Assert.assertEquals(1, update.getCount()); // set wrong entity id in filter, check count = 0 singleEntityFilter.setSingleEntity(tenantAdminUserId); - AlarmCountCmd cmd3 = new AlarmCountCmd(3, alarmCountQuery); + AlarmCountCmd cmd3 = new AlarmCountCmd(2, alarmCountQuery); getWsClient().send(cmd3); update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply()); - Assert.assertEquals(3, update.getCmdId()); + Assert.assertEquals(2, update.getCmdId()); Assert.assertEquals(0, update.getCount()); }