implemented filters for AlarmCountQuery

This commit is contained in:
dashevchenko 2025-02-10 16:28:59 +02:00
parent feeee23fa0
commit 5563da7c0d
12 changed files with 230 additions and 18 deletions

View File

@ -33,7 +33,6 @@ import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
@ -41,7 +40,6 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.ComparisonTsValue;
import org.thingsboard.server.common.data.query.OriginatorAlarmFilter;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
@ -55,7 +53,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd;
@ -65,7 +62,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmDataUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
@ -74,7 +70,6 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.GetTsCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.LatestValueCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.TimeSeriesCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.UnsubscribeCmd;
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
import java.util.ArrayList;
import java.util.Arrays;
@ -83,7 +78,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -555,7 +549,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
private TbAlarmCountSubCtx createSubCtx(WebSocketSessionRef sessionRef, AlarmCountCmd cmd) {
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap<>());
TbAlarmCountSubCtx ctx = new TbAlarmCountSubCtx(serviceId, wsService, entityService, localSubscriptionService,
attributesService, stats, alarmService, sessionRef, cmd.getCmdId());
attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription);
if (cmd.getQuery() != null) {
ctx.setAndResolveQuery(cmd.getQuery());
}

View File

@ -19,20 +19,33 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
import java.util.List;
@Slf4j
@ToString(callSuper = true)
public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx<AlarmCountQuery> {
private final AlarmService alarmService;
private final int maxEntitiesPerAlarmSubscription;
@Getter
@Setter
private volatile int result;
@ -40,20 +53,21 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx<AlarmCountQu
public TbAlarmCountSubCtx(String serviceId, WebSocketService wsService,
EntityService entityService, TbLocalSubscriptionService localSubscriptionService,
AttributesService attributesService, SubscriptionServiceStatistics stats, AlarmService alarmService,
WebSocketSessionRef sessionRef, int cmdId) {
WebSocketSessionRef sessionRef, int cmdId, int maxEntitiesPerAlarmSubscription) {
super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId);
this.alarmService = alarmService;
this.maxEntitiesPerAlarmSubscription = maxEntitiesPerAlarmSubscription;
}
@Override
public void fetchData() {
result = (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query);
result = countAlarms();
sendWsMsg(new AlarmCountUpdate(cmdId, result));
}
@Override
protected void update() {
int newCount = (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query);
int newCount = countAlarms();
if (newCount != result) {
result = newCount;
sendWsMsg(new AlarmCountUpdate(cmdId, result));
@ -64,4 +78,21 @@ public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx<AlarmCountQu
public boolean isDynamic() {
return true;
}
private int countAlarms() {
PageData<EntityData> data = entityService.findEntityDataByQuery(getTenantId(), getCustomerId(), buildEntityDataQuery());
List<EntityId> entityIds = data.getData().stream().map(EntityData::getEntityId).toList();
if (entityIds.isEmpty()) {
return 0;
} else {
return (int) alarmService.countAlarmsByQuery(getTenantId(), getCustomerId(), query, entityIds);
}
}
private EntityDataQuery buildEntityDataQuery() {
EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null,
new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY)));
return new EntityDataQuery(query.getEntityFilter(), edpl, null, null, null);
}
}

View File

@ -83,7 +83,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
@Slf4j
@DaoSqlTest
@TestPropertySource(properties = {
"server.ws.alarms_per_alarm_status_subscription_cache_size=5"
"server.ws.alarms_per_alarm_status_subscription_cache_size=5",
"server.ws.dynamic_page_link.refresh_interval=2"
})
public class WebsocketApiTest extends AbstractControllerTest {
@Autowired
@ -324,6 +325,117 @@ public class WebsocketApiTest extends AbstractControllerTest {
Assert.assertEquals(1, update.getCount());
}
@Test
public void testAlarmCountWsCmdWithSingleEntityFilter() throws Exception {
loginTenantAdmin();
SingleEntityFilter singleEntityFilter = new SingleEntityFilter();
singleEntityFilter.setSingleEntity(tenantId);
AlarmCountQuery alarmCountQuery = new AlarmCountQuery(singleEntityFilter);
AlarmCountCmd cmd1 = new AlarmCountCmd(1, alarmCountQuery);
getWsClient().send(cmd1);
AlarmCountUpdate update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(1, update.getCmdId());
Assert.assertEquals(0, update.getCount());
Alarm alarm = new Alarm();
alarm.setOriginator(tenantId);
alarm.setType("TEST ALARM");
alarm.setSeverity(AlarmSeverity.WARNING);
alarm = doPost("/api/alarm", alarm, Alarm.class);
AlarmCountCmd cmd2 = new AlarmCountCmd(2, alarmCountQuery);
getWsClient().send(cmd2);
update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(2, update.getCmdId());
Assert.assertEquals(1, update.getCount());
singleEntityFilter.setSingleEntity(tenantAdminUserId);
AlarmCountCmd cmd3 = new AlarmCountCmd(3, alarmCountQuery);
getWsClient().send(cmd3);
update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(3, update.getCmdId());
Assert.assertEquals(0, update.getCount());
alarm.setAssigneeId(tenantAdminUserId);
alarm = doPost("/api/alarm", alarm, Alarm.class);
singleEntityFilter.setSingleEntity(tenantId);
alarmCountQuery.setAssigneeId(tenantAdminUserId);
AlarmCountCmd cmd4 = new AlarmCountCmd(4, alarmCountQuery);
getWsClient().send(cmd4);
update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(4, update.getCmdId());
Assert.assertEquals(1, update.getCount());
alarmCountQuery.setSeverityList(Collections.singletonList(AlarmSeverity.CRITICAL));
AlarmCountCmd cmd5 = new AlarmCountCmd(5, alarmCountQuery);
getWsClient().send(cmd5);
update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(5, update.getCmdId());
Assert.assertEquals(0, update.getCount());
alarm.setSeverity(AlarmSeverity.CRITICAL);
doPost("/api/alarm", alarm, Alarm.class);
AlarmCountCmd cmd6 = new AlarmCountCmd(6, alarmCountQuery);
getWsClient().send(cmd6);
update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(6, update.getCmdId());
Assert.assertEquals(1, update.getCount());
}
@Test
public void testAlarmCountWsCmdWithDeviceType() throws Exception {
loginTenantAdmin();
DeviceTypeFilter deviceTypeFilter = new DeviceTypeFilter();
deviceTypeFilter.setDeviceTypes(List.of("default"));
AlarmCountQuery alarmCountQuery = new AlarmCountQuery(deviceTypeFilter);
AlarmCountCmd cmd1 = new AlarmCountCmd(1, alarmCountQuery);
getWsClient().send(cmd1);
AlarmCountUpdate update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(1, update.getCmdId());
Assert.assertEquals(0, update.getCount());
getWsClient().registerWaitForUpdate();
Alarm alarm = new Alarm();
alarm.setOriginator(device.getId());
alarm.setType("TEST ALARM");
alarm.setSeverity(AlarmSeverity.WARNING);
alarm = doPost("/api/alarm", alarm, Alarm.class);
update = getWsClient().parseAlarmCountReply(getWsClient().waitForUpdate(3000));
Assert.assertEquals(1, update.getCmdId());
Assert.assertEquals(1, update.getCount());
deviceTypeFilter.setDeviceTypes(List.of("non-existing"));
AlarmCountCmd cmd3 = new AlarmCountCmd(3, alarmCountQuery);
getWsClient().send(cmd3);
update = getWsClient().parseAlarmCountReply(getWsClient().waitForReply());
Assert.assertEquals(3, update.getCmdId());
Assert.assertEquals(0, update.getCount());
}
@Test
public void testAlarmStatusWsCmd() throws Exception {
loginTenantAdmin();

View File

@ -118,6 +118,8 @@ public interface AlarmService extends EntityDaoService {
long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query);
long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection<EntityId> orderedEntityIds);
PageData<EntitySubtype> findAlarmTypesByTenantId(TenantId tenantId, PageLink pageLink);
List<UUID> findActiveOriginatorAlarms(TenantId tenantId, OriginatorAlarmFilter originatorAlarmFilter, int limit);

View File

@ -19,6 +19,7 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
@ -30,6 +31,7 @@ import java.util.List;
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
public class AlarmCountQuery extends EntityCountQuery {
private long startTs;
@ -40,4 +42,9 @@ public class AlarmCountQuery extends EntityCountQuery {
private List<AlarmSeverity> severityList;
private boolean searchPropagatedAlarms;
private UserId assigneeId;
public AlarmCountQuery(EntityFilter entityFilter) {
super(entityFilter);
}
}

View File

@ -27,7 +27,7 @@ import java.util.List;
public class EntityCountQuery {
@Getter
private EntityFilter entityFilter;
protected EntityFilter entityFilter;
@Getter
protected List<KeyFilter> keyFilters;

View File

@ -106,7 +106,7 @@ public interface AlarmDao extends Dao<Alarm> {
AlarmApiCallResult unassignAlarm(TenantId tenantId, AlarmId alarmId, long unassignTime);
long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query);
long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection<EntityId> orderedEntityIds);
PageData<EntitySubtype> findTenantAlarmTypes(UUID tenantId, PageLink pageLink);

View File

@ -352,7 +352,13 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
@Override
public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query) {
validateId(tenantId, id -> INCORRECT_TENANT_ID + id);
return alarmDao.countAlarmsByQuery(tenantId, customerId, query);
return alarmDao.countAlarmsByQuery(tenantId, customerId, query, Collections.emptyList());
}
@Override
public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection<EntityId> orderedEntityIds) {
validateId(tenantId, id -> INCORRECT_TENANT_ID + id);
return alarmDao.countAlarmsByQuery(tenantId, customerId, query, orderedEntityIds);
}
@Override

View File

@ -415,8 +415,8 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
}
@Override
public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query) {
return alarmQueryRepository.countAlarmsByQuery(tenantId, customerId, query);
public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection<EntityId> orderedEntityIds) {
return alarmQueryRepository.countAlarmsByQuery(tenantId, customerId, query, orderedEntityIds);
}
@Override

View File

@ -30,6 +30,6 @@ public interface AlarmQueryRepository {
PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId,
AlarmDataQuery query, Collection<EntityId> orderedEntityIds);
long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query);
long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection<EntityId> orderedEntityIds);
}

View File

@ -44,6 +44,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
@Repository
@ -314,7 +315,7 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository {
}
@Override
public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query) {
public long countAlarmsByQuery(TenantId tenantId, CustomerId customerId, AlarmCountQuery query, Collection<EntityId> orderedEntityIds) {
QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, null, EntityType.ALARM));
if (query.isSearchPropagatedAlarms()) {
@ -326,6 +327,10 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository {
ctx.append(" and a.customer_id = :customerId and ea.customer_id = :customerId");
ctx.addUuidParameter("customerId", customerId.getId());
}
if (!orderedEntityIds.isEmpty()) {
ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
ctx.append(" and ea.entity_id in (:entity_filter_entity_ids)");
}
} else {
ctx.append("select count(id) from alarm_info a ");
ctx.append("where a.tenant_id = :tenantId");
@ -334,6 +339,10 @@ public class DefaultAlarmQueryRepository implements AlarmQueryRepository {
ctx.append(" and a.customer_id = :customerId");
ctx.addUuidParameter("customerId", customerId.getId());
}
if (!orderedEntityIds.isEmpty()) {
ctx.addUuidListParameter("entity_filter_entity_ids", orderedEntityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
ctx.append(" and a.originator_id in (:entity_filter_entity_ids)");
}
}
long startTs;

View File

@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmApiCallResult;
@ -48,6 +49,7 @@ import org.thingsboard.server.common.data.query.DeviceTypeFilter;
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityListFilter;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.security.Authority;
@ -936,4 +938,53 @@ public class AlarmServiceTest extends AbstractServiceTest {
Assert.assertEquals(0, alarms.getData().size());
}
@Test
public void testCountAlarmsForEntities() throws ExecutionException, InterruptedException {
AssetId parentId = new AssetId(Uuids.timeBased());
AssetId childId = new AssetId(Uuids.timeBased());
EntityRelation relation = new EntityRelation(parentId, childId, EntityRelation.CONTAINS_TYPE);
Assert.assertTrue(relationService.saveRelationAsync(tenantId, relation).get());
long ts = System.currentTimeMillis();
AlarmApiCallResult result = alarmService.createAlarm(AlarmCreateOrUpdateActiveRequest.builder()
.tenantId(tenantId)
.originator(childId)
.type(TEST_ALARM)
.severity(AlarmSeverity.CRITICAL)
.startTs(ts).build());
AlarmInfo created = result.getAlarm();
created.setPropagate(true);
result = alarmService.updateAlarm(AlarmUpdateRequest.fromAlarm(created));
created = result.getAlarm();
EntityListFilter entityListFilter = new EntityListFilter();
entityListFilter.setEntityList(List.of(childId.getId().toString(), parentId.getId().toString()));
entityListFilter.setEntityType(EntityType.ASSET);
AlarmCountQuery countQuery = new AlarmCountQuery(entityListFilter);
countQuery.setStartTs(0L);
countQuery.setEndTs(System.currentTimeMillis());
long alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(childId));
Assert.assertEquals(1, alarmsCount);
countQuery.setSearchPropagatedAlarms(true);
alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(parentId));
Assert.assertEquals(1, alarmsCount);
created = alarmService.acknowledgeAlarm(tenantId, created.getId(), System.currentTimeMillis()).getAlarm();
countQuery.setStatusList(List.of(AlarmSearchStatus.UNACK));
alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(childId));
Assert.assertEquals(0, alarmsCount);
alarmService.clearAlarm(tenantId, created.getId(), System.currentTimeMillis(), null);
countQuery.setStatusList(List.of(AlarmSearchStatus.CLEARED));
alarmsCount = alarmService.countAlarmsByQuery(tenantId, null, countQuery, List.of(childId));
Assert.assertEquals(1, alarmsCount);
}
}