diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java index 21093ce3f5..1f77904a44 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.data.id; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.alarm.AlarmId; import java.util.UUID; @@ -50,6 +51,8 @@ public class EntityIdFactory { return new DeviceId(uuid); case ASSET: return new AssetId(uuid); + case ALARM: + return new AlarmId(uuid); } throw new IllegalArgumentException("EntityType " + type + " is not supported!"); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java index 72ee9a2d8f..587d2b6d2e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/AbstractModelDao.java @@ -127,7 +127,7 @@ public abstract class AbstractModelDao> extends Abstract log.debug("Save entity {}", entity); if (entity.getId() == null) { entity.setId(UUIDs.timeBased()); - } else { + } else if (isDeleteOnSave()) { removeById(entity.getId()); } Statement saveStatement = getSaveQuery(entity); @@ -136,6 +136,10 @@ public abstract class AbstractModelDao> extends Abstract return new EntityResultSet<>(resultSet, entity); } + protected boolean isDeleteOnSave() { + return true; + } + public T save(T entity) { return saveWithResult(entity).getEntity(); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java index 05678cc0b4..994fe8036a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/AlarmDaoImpl.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmQuery; import org.thingsboard.server.common.data.id.EntityId; @@ -59,6 +60,10 @@ public class AlarmDaoImpl extends AbstractModelDao implements Alarm return ALARM_COLUMN_FAMILY_NAME; } + protected boolean isDeleteOnSave() { + return false; + } + @Override public AlarmEntity save(Alarm alarm) { log.debug("Save asset [{}] ", alarm); @@ -92,7 +97,7 @@ public class AlarmDaoImpl extends AbstractModelDao implements Alarm log.trace("Try to find alarms by entity [{}], status [{}] and pageLink [{}]", query.getAffectedEntityId(), query.getStatus(), query.getPageLink()); EntityId affectedEntity = query.getAffectedEntityId(); String relationType = query.getStatus() == null ? BaseAlarmService.ALARM_RELATION : BaseAlarmService.ALARM_RELATION_PREFIX + query.getStatus().name(); - ListenableFuture> relations = relationDao.findRelations(affectedEntity, relationType, query.getPageLink()); + ListenableFuture> relations = relationDao.findRelations(affectedEntity, relationType, EntityType.ALARM, query.getPageLink()); return Futures.transform(relations, (AsyncFunction, List>) input -> { List> alarmFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index d274ea930c..2675fa2cec 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,8 +41,12 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.tenant.TenantDao; import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; import static org.thingsboard.server.dao.DaoUtil.*; @@ -64,6 +68,21 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService @Autowired private RelationService relationService; + protected ExecutorService readResultsProcessingExecutor; + + @PostConstruct + public void startExecutor() { + readResultsProcessingExecutor = Executors.newCachedThreadPool(); + } + + @PreDestroy + public void stopExecutor() { + if (readResultsProcessingExecutor != null) { + readResultsProcessingExecutor.shutdownNow(); + } + } + + @Override public Alarm createOrUpdateAlarm(Alarm alarm) { alarmDataValidator.validate(alarm); @@ -85,6 +104,8 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService createRelation(new EntityRelation(parentId, saved.getId(), ALARM_RELATION)); createRelation(new EntityRelation(parentId, saved.getId(), ALARM_RELATION_PREFIX + saved.getStatus().name())); } + createRelation(new EntityRelation(alarm.getOriginator(), saved.getId(), ALARM_RELATION)); + createRelation(new EntityRelation(alarm.getOriginator(), saved.getId(), ALARM_RELATION_PREFIX + saved.getStatus().name())); return saved; } else { log.debug("Alarm before merge: {}", alarm); @@ -218,6 +239,8 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService deleteRelation(new EntityRelation(parentId, alarm.getId(), ALARM_RELATION_PREFIX + oldStatus.name())); createRelation(new EntityRelation(parentId, alarm.getId(), ALARM_RELATION_PREFIX + newStatus.name())); } + deleteRelation(new EntityRelation(alarm.getOriginator(), alarm.getId(), ALARM_RELATION_PREFIX + oldStatus.name())); + createRelation(new EntityRelation(alarm.getOriginator(), alarm.getId(), ALARM_RELATION_PREFIX + newStatus.name())); } catch (ExecutionException | InterruptedException e) { log.warn("[{}] Failed to update relations. Old status: [{}], New status: [{}]", alarm.getId(), oldStatus, newStatus); throw new RuntimeException(e); @@ -227,7 +250,7 @@ public class BaseAlarmService extends BaseEntityService implements AlarmService private ListenableFuture getAndUpdate(AlarmId alarmId, Function function) { validateId(alarmId, "Alarm id should be specified!"); ListenableFuture entity = alarmDao.findAlarmByIdAsync(alarmId.getId()); - return Futures.transform(entity, function); + return Futures.transform(entity, function, readResultsProcessingExecutor); } private DataValidator alarmDataValidator = diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 0c68c39e33..2648b49b82 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -170,6 +170,7 @@ public class ModelConstants { public static final String RELATION_TO_TYPE_PROPERTY = "to_type"; public static final String RELATION_TYPE_PROPERTY = "relation_type"; + public static final String RELATION_BY_TYPE_AND_CHILD_TYPE_VIEW_NAME = "relation_by_type_and_child_type"; public static final String RELATION_REVERSE_VIEW_NAME = "reverse_relation"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java index b179a1ede4..1b05866c75 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationDao.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.page.TimePageLink; @@ -154,12 +155,13 @@ public class BaseRelationDao extends AbstractAsyncDao implements RelationDao { } @Override - public ListenableFuture> findRelations(EntityId from, String relationType, TimePageLink pageLink) { - Select.Where query = AbstractSearchTimeDao.buildQuery(RELATION_COLUMN_FAMILY_NAME, + public ListenableFuture> findRelations(EntityId from, String relationType, EntityType childType, TimePageLink pageLink) { + Select.Where query = AbstractSearchTimeDao.buildQuery(ModelConstants.RELATION_BY_TYPE_AND_CHILD_TYPE_VIEW_NAME, Arrays.asList(eq(ModelConstants.RELATION_FROM_ID_PROPERTY, from.getId()), eq(ModelConstants.RELATION_FROM_TYPE_PROPERTY, from.getEntityType().name()), - eq(ModelConstants.RELATION_TYPE_PROPERTY, relationType)), - QueryBuilder.asc(ModelConstants.RELATION_TYPE_PROPERTY), + eq(ModelConstants.RELATION_TYPE_PROPERTY, relationType), + eq(ModelConstants.RELATION_TO_TYPE_PROPERTY, childType.name())), + Arrays.asList(QueryBuilder.asc(ModelConstants.RELATION_TYPE_PROPERTY), QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY)), pageLink, ModelConstants.RELATION_TO_ID_PROPERTY); return getFuture(executeAsyncRead(query), rs -> getEntityRelations(rs)); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java index 1a4838a5c4..bec03208ac 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.relation; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -45,6 +46,6 @@ public interface RelationDao { ListenableFuture deleteOutboundRelations(EntityId entity); - ListenableFuture> findRelations(EntityId from, String relationType, TimePageLink pageLink); + ListenableFuture> findRelations(EntityId from, String relationType, EntityType toType, TimePageLink pageLink); } diff --git a/dao/src/main/resources/schema.cql b/dao/src/main/resources/schema.cql index 071697eac9..44afed9240 100644 --- a/dao/src/main/resources/schema.cql +++ b/dao/src/main/resources/schema.cql @@ -271,6 +271,13 @@ CREATE TABLE IF NOT EXISTS thingsboard.relation ( PRIMARY KEY ((from_id, from_type), relation_type, to_id, to_type) ) WITH CLUSTERING ORDER BY ( relation_type ASC, to_id ASC, to_type ASC); +CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.relation_by_type_and_child_type AS + SELECT * + from thingsboard.relation + WHERE from_id IS NOT NULL AND from_type IS NOT NULL AND relation_type IS NOT NULL AND to_id IS NOT NULL AND to_type IS NOT NULL + PRIMARY KEY ((from_id, from_type), relation_type, to_type, to_id) + WITH CLUSTERING ORDER BY ( relation_type ASC, from_type ASC, from_id ASC); + CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.reverse_relation AS SELECT * from thingsboard.relation diff --git a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java index 19def01476..b1502595cb 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java +++ b/dao/src/test/java/org/thingsboard/server/dao/DaoTestSuite.java @@ -25,7 +25,6 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClassnameFilters({ -// "org.thingsboard.server.dao.service.AlarmServiceTest" "org.thingsboard.server.dao.service.*Test", "org.thingsboard.server.dao.kv.*Test", "org.thingsboard.server.dao.plugin.*Test", diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java index ca4cdef2be..4264cbd732 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2017 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -97,14 +97,76 @@ public class AlarmServiceTest extends AbstractServiceTest { Alarm fetched = alarmService.findAlarmById(created.getId()).get(); Assert.assertEquals(created, fetched); + } -// TimePageData alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId) -// .affectedEntityId(parentId) -// .status(AlarmStatus.ACTIVE_UNACK).pageLink( -// new TimePageLink(1, 0L, Long.MAX_VALUE, true) -// ).build()).get(); -// Assert.assertNotNull(alarms.getData()); -// Assert.assertEquals(1, alarms.getData().size()); -// Assert.assertEquals(created, alarms.getData().get(0)); + @Test + public void testFindAlarm() throws ExecutionException, InterruptedException { + AssetId parentId = new AssetId(UUIDs.timeBased()); + AssetId childId = new AssetId(UUIDs.timeBased()); + + EntityRelation relation = new EntityRelation(parentId, childId, EntityRelation.CONTAINS_TYPE); + + Assert.assertTrue(relationService.saveRelation(relation).get()); + + long ts = System.currentTimeMillis(); + Alarm alarm = Alarm.builder().tenantId(tenantId).originator(childId) + .type(TEST_ALARM) + .severity(AlarmSeverity.CRITICAL).status(AlarmStatus.ACTIVE_UNACK) + .startTs(ts).build(); + + Alarm created = alarmService.createOrUpdateAlarm(alarm); + + // Check child relation + TimePageData alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId) + .affectedEntityId(childId) + .status(AlarmStatus.ACTIVE_UNACK).pageLink( + new TimePageLink(1, 0L, System.currentTimeMillis(), true) + ).build()).get(); + Assert.assertNotNull(alarms.getData()); + Assert.assertEquals(1, alarms.getData().size()); + Assert.assertEquals(created, alarms.getData().get(0)); + + // Check parent relation + alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId) + .affectedEntityId(parentId) + .status(AlarmStatus.ACTIVE_UNACK).pageLink( + new TimePageLink(1, 0L, System.currentTimeMillis(), true) + ).build()).get(); + Assert.assertNotNull(alarms.getData()); + Assert.assertEquals(1, alarms.getData().size()); + Assert.assertEquals(created, alarms.getData().get(0)); + + alarmService.ackAlarm(created.getId(), System.currentTimeMillis()).get(); + created = alarmService.findAlarmById(created.getId()).get(); + + alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId) + .affectedEntityId(childId) + .status(AlarmStatus.ACTIVE_ACK).pageLink( + new TimePageLink(1, 0L, System.currentTimeMillis(), true) + ).build()).get(); + Assert.assertNotNull(alarms.getData()); + Assert.assertEquals(1, alarms.getData().size()); + Assert.assertEquals(created, alarms.getData().get(0)); + + // Check not existing relation + alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId) + .affectedEntityId(childId) + .status(AlarmStatus.ACTIVE_UNACK).pageLink( + new TimePageLink(1, 0L, System.currentTimeMillis(), true) + ).build()).get(); + Assert.assertNotNull(alarms.getData()); + Assert.assertEquals(0, alarms.getData().size()); + + alarmService.clearAlarm(created.getId(), System.currentTimeMillis()).get(); + created = alarmService.findAlarmById(created.getId()).get(); + + alarms = alarmService.findAlarms(AlarmQuery.builder().tenantId(tenantId) + .affectedEntityId(childId) + .status(AlarmStatus.CLEARED_ACK).pageLink( + new TimePageLink(1, 0L, System.currentTimeMillis(), true) + ).build()).get(); + Assert.assertNotNull(alarms.getData()); + Assert.assertEquals(1, alarms.getData().size()); + Assert.assertEquals(created, alarms.getData().get(0)); } }