From 1256f09e7e2e6135ff6265002a39dcaf90ec7c91 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 10 Jan 2024 10:19:07 +0200 Subject: [PATCH 1/7] Introduce edge support for alarm comment --- .../edge/DefaultEdgeNotificationService.java | 3 + .../edge/EdgeEventSourcingListener.java | 16 +- .../service/edge/rpc/EdgeGrpcSession.java | 9 ++ .../alarm/AlarmMsgConstructor.java | 4 + .../alarm/AlarmMsgConstructorV1.java | 2 +- .../alarm/AlarmMsgConstructorV2.java | 2 +- .../alarm/BaseAlarmMsgConstructor.java | 29 ++++ .../edge/rpc/processor/BaseEdgeProcessor.java | 49 +++---- .../processor/alarm/AlarmEdgeProcessor.java | 138 ++++++++++++------ .../rpc/processor/alarm/AlarmProcessor.java | 5 + .../processor/alarm/BaseAlarmProcessor.java | 32 ++++ .../DefaultTbNotificationEntityService.java | 38 +---- .../controller/AbstractNotifyEntityTest.java | 12 +- .../server/edge/imitator/EdgeImitator.java | 6 + .../rpc/processor/BaseEdgeProcessorTest.java | 4 + .../server/dao/alarm/AlarmCommentService.java | 1 + .../server/common/data/EdgeUtils.java | 13 ++ .../common/data/edge/EdgeEventActionType.java | 59 +++++--- .../common/data/edge/EdgeEventType.java | 1 + common/edge-api/src/main/proto/edge.proto | 7 + .../dao/alarm/BaseAlarmCommentService.java | 7 +- .../server/dao/alarm/BaseAlarmService.java | 2 +- .../engine/edge/AbstractTbMsgPushNode.java | 52 +++---- .../engine/edge/TbMsgPushToCloudNode.java | 5 + .../rule/engine/edge/TbMsgPushToEdgeNode.java | 52 ++++--- 25 files changed, 346 insertions(+), 202 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index b3aba37632..03fb982d6d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -216,6 +216,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { case ALARM: alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); break; + case ALARM_COMMENT: + alarmProcessor.processAlarmCommentNotification(tenantId, edgeNotificationMsg); + break; case RELATION: relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); break; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 351f42d1fc..5883d04fe6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -23,10 +23,12 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -43,8 +45,6 @@ import org.thingsboard.server.dao.user.UserServiceImpl; import javax.annotation.PostConstruct; -import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; - /** * This event listener does not support async event processing because relay on ThreadLocal * Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener @@ -91,8 +91,12 @@ public class EdgeEventSourcingListener { public void handleEvent(DeleteEntityEvent event) { try { log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); + EdgeEventType type = null; + if (event.getEntity() instanceof AlarmComment) { + type = EdgeEventType.ALARM_COMMENT; + } tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED, + JacksonUtil.toString(event.getEntity()), type, EdgeEventActionType.DELETED, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e); @@ -104,7 +108,7 @@ public class EdgeEventSourcingListener { try { log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), - event.getBody(), null, edgeTypeByActionType(event.getActionType()), + event.getBody(), null, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()), edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e); @@ -125,7 +129,7 @@ public class EdgeEventSourcingListener { } log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, - JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()), + JacksonUtil.toString(relation), EdgeEventType.RELATION, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()), edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e); @@ -150,7 +154,7 @@ public class EdgeEventSourcingListener { cleanUpUserAdditionalInfo(user); return !user.equals(oldUser); } - } else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm) { + } else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm || entity instanceof AlarmComment) { return false; } // Default: If the entity doesn't match any of the conditions, consider it as valid. diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index f1f473f976..14572bc871 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -644,6 +645,8 @@ public final class EdgeGrpcSession implements Closeable { return ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, this.edgeVersion); case ALARM: return ctx.getAlarmProcessor().convertAlarmEventToDownlink(edgeEvent, this.edgeVersion); + case ALARM_COMMENT: + return ctx.getAlarmProcessor().convertAlarmCommentEventToDownlink(edgeEvent, this.edgeVersion); case USER: return ctx.getUserProcessor().convertUserEventToDownlink(edgeEvent, this.edgeVersion); case RELATION: @@ -714,6 +717,12 @@ public final class EdgeGrpcSession implements Closeable { .processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg)); } } + if (uplinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : uplinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(((AlarmProcessor) ctx.getAlarmEdgeProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) + .processAlarmCommentMsgFromEdge(edge.getTenantId(), edge.getId(), alarmCommentUpdateMsg)); + } + } if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) { for (EntityViewUpdateMsg entityViewUpdateMsg : uplinkMsg.getEntityViewUpdateMsgList()) { result.add(((EntityViewProcessor) ctx.getEntityViewProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java index 81606434b5..b5a0f20d6b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java @@ -16,6 +16,8 @@ package org.thingsboard.server.service.edge.rpc.constructor.alarm; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; @@ -23,4 +25,6 @@ import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; public interface AlarmMsgConstructor extends MsgConstructor { AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName); + + AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java index 5abba029de..81a489671f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV1 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV1 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java index d9768b8f9e..143a41b73c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV2 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV2 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java new file mode 100644 index 0000000000..c9af86e8cc --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor.alarm; + +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; + +public abstract class BaseAlarmMsgConstructor implements AlarmMsgConstructor { + + @Override + public AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment) { + return AlarmCommentUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(alarmComment)).build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 6f7474b15d..256f191215 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -51,6 +51,7 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; @@ -59,6 +60,7 @@ import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -145,6 +147,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected AlarmService alarmService; + @Autowired + protected AlarmCommentService alarmCommentService; + @Autowired protected DeviceService deviceService; @@ -350,10 +355,13 @@ public abstract class BaseEdgeProcessor { case ALARM_ASSIGNED: case ALARM_UNASSIGNED: case CREDENTIALS_REQUEST: + case ADDED_COMMENT: + case UPDATED_COMMENT: return true; } switch (type) { case ALARM: + case ALARM_COMMENT: case RULE_CHAIN: case RULE_CHAIN_METADATA: case USER: @@ -441,14 +449,17 @@ public abstract class BaseEdgeProcessor { case CREDENTIALS_UPDATED: case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: + case UPDATED_COMMENT: return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE; case ADDED: case ASSIGNED_TO_EDGE: case RELATION_ADD_OR_UPDATE: + case ADDED_COMMENT: return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE; case DELETED: case UNASSIGNED_FROM_EDGE: case RELATION_DELETED: + case DELETED_COMMENT: return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; case ALARM_ACK: return UpdateMsgType.ALARM_ACK_RPC_MESSAGE; @@ -517,22 +528,14 @@ public abstract class BaseEdgeProcessor { private ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); } - } while (pageData != null && pageData.hasNext()); + } return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } @@ -715,19 +718,13 @@ public abstract class BaseEdgeProcessor { } private boolean isEntityNotAssignedToEdge(TenantId tenantId, EntityId entityId, EdgeId edgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - if (pageData.getData().contains(edgeId)) { - return false; - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId edgeId1 : edgeIds) { + if (edgeId1.equals(edgeId)) { + return false; } - } while (pageData != null && pageData.hasNext()); + } return true; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index ea48032b69..e6889a1e6f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -22,22 +22,30 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructor; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; @Slf4j @@ -67,58 +75,100 @@ public abstract class AlarmEdgeProcessor extends BaseAlarmProcessor implements A return null; } + @Override + public ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsgFromEdge [{}]", tenantId, alarmCommentUpdateMsg); + try { + edgeSynchronizationManager.getEdgeId().set(edgeId); + return processAlarmCommentMsg(tenantId, alarmCommentUpdateMsg); + } finally { + edgeSynchronizationManager.getEdgeId().remove(); + } + } + + @Override + public DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) { + AlarmCommentId alarmCommentId = new AlarmCommentId(edgeEvent.getEntityId()); + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + AlarmComment alarmComment; + switch (edgeEvent.getAction()) { + case ADDED_COMMENT: + case UPDATED_COMMENT: + alarmComment = alarmCommentService.findAlarmCommentById(edgeEvent.getTenantId(), alarmCommentId); + break; + case DELETED_COMMENT: + alarmComment = JacksonUtil.convertValue(edgeEvent.getBody(), AlarmComment.class); + break; + default: + return null; + } + return Optional.ofNullable(alarmComment).map(comment -> buildAlarmCommentDownlinkMsg(msgType, comment, edgeVersion)).orElse(null); + } + + private DownlinkMsg buildAlarmCommentDownlinkMsg(UpdateMsgType msgType, AlarmComment alarmComment, EdgeVersion edgeVersion) { + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory + .getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment)) + .build(); + } + public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); - switch (actionType) { - case DELETED: - Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); - if (deletedAlarm == null) { - return Futures.immediateFuture(null); - } - List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), - alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId); - return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); - default: - ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); - return Futures.transformAsync(alarmFuture, alarm -> { - if (alarm == null) { - return Futures.immediateFuture(null); - } - EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); - if (type == null) { - return Futures.immediateFuture(null); - } - List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), - alarmId, actionType, null, originatorEdgeId); - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); - }, dbCallbackExecutorService); + if (EdgeEventActionType.DELETED.equals(actionType)) { + Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); + if (deletedAlarm == null) { + return Futures.immediateFuture(null); + } + List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); } + ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); + return Futures.transformAsync(alarmFuture, alarm -> { + if (alarm == null) { + return Futures.immediateFuture(null); + } + EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); + if (type == null) { + return Futures.immediateFuture(null); + } + List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), + alarmId, actionType, null, originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + }, dbCallbackExecutorService); } - private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, - relatedEdgeId, - EdgeEventType.ALARM, - actionType, - alarmId, - body)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + public ListenableFuture processAlarmCommentNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); + if (EdgeEventActionType.DELETED.equals(actionType)) { + AlarmComment deletedAlarmComment = JacksonUtil.fromString(edgeNotificationMsg.getBody(), AlarmComment.class); + if (deletedAlarmComment == null) { + return Futures.immediateFuture(null); } - } while (pageData != null && pageData.hasNext()); + Alarm alarmById = alarmService.findAlarmById(tenantId, new AlarmId(deletedAlarmComment.getAlarmId().getId())); + List> delFutures = pushEventToAllRelatedEdges(tenantId, alarmById.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarmComment), originatorEdgeId, EdgeEventType.ALARM_COMMENT); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); + } + return Futures.immediateFuture(null); + } + + private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, + EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId, + EdgeEventType edgeEventType) { + List> futures = new ArrayList<>(); + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, originatorId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, edgeEventType, actionType, alarmId, body)); + } + } return futures; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java index 68a72bf01e..938269462d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; @@ -29,4 +30,8 @@ public interface AlarmProcessor extends EdgeProcessor { ListenableFuture processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg); DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); + + ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg); + + DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 546d6e63db..6d04db5b75 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -23,6 +23,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCreateOrUpdateActiveRequest; import org.thingsboard.server.common.data.alarm.AlarmUpdateRequest; import org.thingsboard.server.common.data.asset.Asset; @@ -33,6 +34,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; @@ -93,6 +95,36 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { return Futures.immediateFuture(null); } + public ListenableFuture processAlarmCommentMsg(TenantId tenantId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsg [{}]", tenantId, alarmCommentUpdateMsg); + AlarmComment alarmComment = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + if (alarmComment == null) { + throw new RuntimeException("[{" + tenantId + "}] alarmCommentUpdateMsg {" + alarmCommentUpdateMsg + "} cannot be converted to alarm comment"); + } + try { + switch (alarmCommentUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + alarmCommentService.createOrUpdateAlarmComment(tenantId, alarmComment); + break; + case ENTITY_DELETED_RPC_MESSAGE: + AlarmComment alarmCommentToDelete = alarmCommentService.findAlarmCommentById(tenantId, alarmComment.getId()); + if (alarmCommentToDelete != null) { + alarmCommentService.saveAlarmComment(tenantId, alarmCommentToDelete); + } + break; + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(alarmCommentUpdateMsg.getMsgType()); + } + } catch (Exception e) { + log.error("[{}] Failed to process alarm comment update msg [{}]", tenantId, alarmCommentUpdateMsg, e); + return Futures.immediateFailedFuture(e); + } + return Futures.immediateFuture(null); + } + + protected abstract EntityId getAlarmOriginatorFromMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg); protected abstract Alarm constructAlarmFromUpdateMsg(TenantId tenantId, AlarmId alarmId, EntityId originatorId, AlarmUpdateMsg alarmUpdateMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index 321cc69604..3a87fff506 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.HasName; @@ -27,7 +26,6 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -40,6 +38,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.service.action.EntityActionService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -174,39 +173,4 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS metaData.putValue("assignedFromTenantName", tenant.getName()); return metaData; } - - public static EdgeEventActionType edgeTypeByActionType(ActionType actionType) { - switch (actionType) { - case ADDED: - return EdgeEventActionType.ADDED; - case UPDATED: - return EdgeEventActionType.UPDATED; - case ALARM_ACK: - return EdgeEventActionType.ALARM_ACK; - case ALARM_CLEAR: - return EdgeEventActionType.ALARM_CLEAR; - case ALARM_ASSIGNED: - return EdgeEventActionType.ALARM_ASSIGNED; - case ALARM_UNASSIGNED: - return EdgeEventActionType.ALARM_UNASSIGNED; - case DELETED: - return EdgeEventActionType.DELETED; - case RELATION_ADD_OR_UPDATE: - return EdgeEventActionType.RELATION_ADD_OR_UPDATE; - case RELATION_DELETED: - return EdgeEventActionType.RELATION_DELETED; - case ASSIGNED_TO_CUSTOMER: - return EdgeEventActionType.ASSIGNED_TO_CUSTOMER; - case UNASSIGNED_FROM_CUSTOMER: - return EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER; - case ASSIGNED_TO_EDGE: - return EdgeEventActionType.ASSIGNED_TO_EDGE; - case UNASSIGNED_FROM_EDGE: - return EdgeEventActionType.UNASSIGNED_FROM_EDGE; - case CREDENTIALS_UPDATED: - return EdgeEventActionType.CREDENTIALS_UPDATED; - default: - return null; - } - } } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java index 429d0b765f..44302f08ba 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java @@ -20,6 +20,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; @@ -47,7 +48,6 @@ import java.util.stream.Collectors; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; -import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; @Slf4j public abstract class AbstractNotifyEntityTest extends AbstractWebTest { @@ -91,7 +91,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { int cntTime = 1; Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.equals(relation.getTo()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -111,7 +111,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -317,7 +317,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? - EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); + EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType); Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any()); } @@ -353,7 +353,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceTime(EntityId entityId, TenantId tenantId, ActionType actionType, int cntTime) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? - EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); + EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType); ArgumentMatcher matcherEntityId = cntTime == 1 ? argument -> argument.equals(entityId) : argument -> argument.getClass().equals(entityId.getClass()); Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), @@ -364,7 +364,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); } protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) { diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index f7c464fab9..f5e018ed3a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -27,6 +27,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.edge.rpc.EdgeGrpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -231,6 +232,11 @@ public class EdgeImitator { result.add(saveDownlinkMsg(alarmUpdateMsg)); } } + if (downlinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : downlinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(saveDownlinkMsg(alarmCommentUpdateMsg)); + } + } if (downlinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : downlinkMsg.getEntityDataList()) { if (randomFailuresOnTimeseriesDownlink) { diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index 384c00c8ed..9c45977a7c 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -157,6 +158,9 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected AlarmService alarmService; + @MockBean + protected AlarmCommentService alarmCommentService; + @MockBean protected DeviceService deviceService; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java index 7bee9f6983..c822693a71 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; public interface AlarmCommentService { + AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment); AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java index dc969bb362..69a3fc7d8c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Throwables; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -32,6 +33,7 @@ import java.util.concurrent.ThreadLocalRandom; public final class EdgeUtils { private static final EnumMap entityTypeEdgeEventTypeEnumMap; + private static final EnumMap actionTypeEdgeEventActionTypeEnumMap; static { entityTypeEdgeEventTypeEnumMap = new EnumMap<>(EntityType.class); @@ -40,6 +42,13 @@ public final class EdgeUtils { entityTypeEdgeEventTypeEnumMap.put(edgeEventType.getEntityType(), edgeEventType); } } + + actionTypeEdgeEventActionTypeEnumMap = new EnumMap<>(ActionType.class); + for (EdgeEventActionType edgeEventActionType : EdgeEventActionType.values()) { + if (edgeEventActionType.getActionType() != null) { + actionTypeEdgeEventActionTypeEnumMap.put(edgeEventActionType.getActionType(), edgeEventActionType); + } + } } private static final int STACK_TRACE_LIMIT = 10; @@ -54,6 +63,10 @@ public final class EdgeUtils { return entityTypeEdgeEventTypeEnumMap.get(entityType); } + public static EdgeEventActionType getEdgeEventActionTypeByActionType(ActionType actionType) { + return actionTypeEdgeEventActionTypeEnumMap.get(actionType); + } + public static EdgeEvent constructEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index e355901182..e39d93946c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -1,5 +1,5 @@ /** - * Copyright © 2016-2024 The Thingsboard Authors + * Copyright © 2016-2023 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,26 +15,39 @@ */ package org.thingsboard.server.common.data.edge; +import lombok.Getter; +import org.thingsboard.server.common.data.audit.ActionType; + +@Getter public enum EdgeEventActionType { - ADDED, - DELETED, - UPDATED, - POST_ATTRIBUTES, - ATTRIBUTES_UPDATED, - ATTRIBUTES_DELETED, - TIMESERIES_UPDATED, - CREDENTIALS_UPDATED, - ASSIGNED_TO_CUSTOMER, - UNASSIGNED_FROM_CUSTOMER, - RELATION_ADD_OR_UPDATE, - RELATION_DELETED, - RPC_CALL, - ALARM_ACK, - ALARM_CLEAR, - ALARM_ASSIGNED, - ALARM_UNASSIGNED, - ASSIGNED_TO_EDGE, - UNASSIGNED_FROM_EDGE, - CREDENTIALS_REQUEST, - ENTITY_MERGE_REQUEST // deprecated -} \ No newline at end of file + ADDED(ActionType.ADDED), + UPDATED(ActionType.UPDATED), + DELETED(ActionType.DELETED), + POST_ATTRIBUTES(null), + ATTRIBUTES_UPDATED(ActionType.ATTRIBUTES_UPDATED), + ATTRIBUTES_DELETED(ActionType.ATTRIBUTES_DELETED), + TIMESERIES_UPDATED(ActionType.TIMESERIES_UPDATED), + CREDENTIALS_UPDATED(ActionType.CREDENTIALS_UPDATED), + ASSIGNED_TO_CUSTOMER(ActionType.ASSIGNED_TO_CUSTOMER), + UNASSIGNED_FROM_CUSTOMER(ActionType.UNASSIGNED_FROM_CUSTOMER), + RELATION_ADD_OR_UPDATE(ActionType.RELATION_ADD_OR_UPDATE), + RELATION_DELETED(ActionType.RELATION_DELETED), + RPC_CALL(ActionType.RPC_CALL), + ALARM_ACK(ActionType.ALARM_ACK), + ALARM_CLEAR(ActionType.ALARM_CLEAR), + ALARM_ASSIGNED(ActionType.ALARM_ASSIGNED), + ALARM_UNASSIGNED(ActionType.ALARM_UNASSIGNED), + ADDED_COMMENT(ActionType.ADDED_COMMENT), + UPDATED_COMMENT(ActionType.UPDATED_COMMENT), + DELETED_COMMENT(ActionType.DELETED_COMMENT), + ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE), + UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE), + CREDENTIALS_REQUEST(null), + ENTITY_MERGE_REQUEST(null); // deprecated + + private final ActionType actionType; + + EdgeEventActionType(ActionType actionType) { + this.actionType = actionType; + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index a8627d3c0d..6fc255501f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -27,6 +27,7 @@ public enum EdgeEventType { ASSET_PROFILE(true, EntityType.ASSET_PROFILE), ENTITY_VIEW(false, EntityType.ENTITY_VIEW), ALARM(false, EntityType.ALARM), + ALARM_COMMENT(false, null), RULE_CHAIN(false, EntityType.RULE_CHAIN), RULE_CHAIN_METADATA(false, null), EDGE(false, EntityType.EDGE), diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 2f33c21fd8..34a8c6f093 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -325,6 +325,11 @@ message AlarmUpdateMsg { string entity = 18; } +message AlarmCommentUpdateMsg { + UpdateMsgType msgType = 1; + string entity = 2; +} + message CustomerUpdateMsg { UpdateMsgType msgType = 1; int64 idMSB = 2; @@ -618,6 +623,7 @@ message UplinkMsg { repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19; repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20; repeated ResourceUpdateMsg resourceUpdateMsg = 21; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 22; } message UplinkResponseMsg { @@ -661,5 +667,6 @@ message DownlinkMsg { repeated TenantUpdateMsg tenantUpdateMsg = 26; repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27; repeated ResourceUpdateMsg resourceUpdateMsg = 28; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 29; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 3bc8ceb1a0..b60acc6550 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; +import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.service.DataValidator; import java.util.UUID; @@ -61,7 +62,10 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al public AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment) { log.debug("Deleting Alarm Comment: {}", alarmComment); alarmCommentDataValidator.validate(alarmComment, c -> tenantId); - return alarmCommentDao.save(tenantId, alarmComment); + AlarmComment result = alarmCommentDao.save(tenantId, alarmComment); + eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entity(result) + .entityId(result.getAlarmId()).build()); + return result; } @Override @@ -112,5 +116,4 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al } return null; } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index ae9763c59c..2a42544bec 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -135,7 +135,7 @@ public class BaseAlarmService extends AbstractCachedEntityService metadata = msg.getMetaData().getData(); EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msg); @@ -100,7 +100,8 @@ public abstract class AbstractTbMsgPushNode keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {}); + List keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { + }); entityBody.put("keys", keys); entityBody.put(SCOPE, getScope(metadata)); break; @@ -137,6 +138,8 @@ public abstract class AbstractTbMsgPushNode getConfigClazz(); @@ -149,6 +152,11 @@ public abstract class AbstractTbMsgPushNode metadata) { String scope = metadata.get(SCOPE); if (StringUtils.isEmpty(scope)) { @@ -171,6 +179,10 @@ public abstract class AbstractTbMsgPushNodeSupports next originator types:" + - "
DEVICE" + - "
ASSET" + - "
ENTITY_VIEW" + - "
DASHBOARD" + - "
TENANT" + - "
CUSTOMER" + - "
EDGE

" + - "As well node supports next message types:" + + "Supports next message types:" + "
POST_TELEMETRY_REQUEST" + "
POST_ATTRIBUTES_REQUEST" + "
ATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + - "Message will be routed via Failure route if node was not able to save edge event to database or unsupported originator type/message type arrived. " + + "Message will be routed via Failure route if node was not able to save edge event to database or unsupported message type arrived. " + "In case successful storage edge event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodePushToEdgeConfig", @@ -99,6 +93,11 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode pageData; - List> futures = new ArrayList<>(); - do { - pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId edgeId : pageData.getData()) { - EdgeEvent edgeEvent = buildEvent(msg, ctx); - futures.add(notifyEdge(ctx, edgeEvent, edgeId)); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + EntityId originatorId = msg.getOriginator(); + if (DataConstants.COMMENT_CREATED.equals(msg.getType()) || DataConstants.COMMENT_UPDATED.equals(msg.getType())) { + Alarm alarm = JacksonUtil.fromString(msg.getData(), Alarm.class); + if (alarm != null) { + originatorId = alarm.getOriginator(); } - } while (pageData != null && pageData.hasNext()); + } + List> futures = new ArrayList<>(); + EntityId finalOriginatorId = originatorId; + PageDataIterableByTenantIdEntityId edgeIds = new PageDataIterableByTenantIdEntityId<>( + ctx.getEdgeService()::findRelatedEdgeIdsByEntityId, ctx.getTenantId(), finalOriginatorId, DEFAULT_PAGE_SIZE); + for (EdgeId edgeId : edgeIds) { + EdgeEvent edgeEvent = buildEvent(msg, ctx); + futures.add(notifyEdge(ctx, edgeEvent, edgeId)); + } if (futures.isEmpty()) { // ack in case no edges are related to provided entity @@ -176,5 +175,4 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode Date: Wed, 10 Jan 2024 13:04:38 +0200 Subject: [PATCH 2/7] Fix license headers. Add test for alarm comments from edge --- .../processor/alarm/BaseAlarmProcessor.java | 7 ++ .../entitiy/alarm/TbAlarmCommentService.java | 1 + .../server/edge/AlarmEdgeTest.java | 118 +++++++++--------- .../common/data/edge/EdgeEventActionType.java | 2 +- .../data/page/BasePageDataIterable.java | 5 +- 5 files changed, 70 insertions(+), 63 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 6d04db5b75..e82c2aa2d0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; @@ -34,6 +35,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.alarm.AlarmCommentDao; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; @@ -46,6 +48,9 @@ import java.util.UUID; @Slf4j public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { + @Autowired + protected AlarmCommentDao alarmCommentDao; + public ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg); AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB())); @@ -104,6 +109,8 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { try { switch (alarmCommentUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: + alarmCommentDao.createAlarmComment(tenantId, alarmComment); + break; case ENTITY_UPDATED_RPC_MESSAGE: alarmCommentService.createOrUpdateAlarmComment(tenantId, alarmComment); break; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java index c0274792cb..da7c69050e 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.exception.ThingsboardException; public interface TbAlarmCommentService { + AlarmComment saveAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException; void deleteAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException; diff --git a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java index 5ada0e69d5..6e10507f1e 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java @@ -15,21 +15,27 @@ */ package org.thingsboard.server.edge; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.protobuf.AbstractMessage; +import com.fasterxml.jackson.databind.node.TextNode; import org.junit.Assert; import org.junit.Test; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; +import org.thingsboard.server.common.data.alarm.AlarmCommentType; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.alarm.AlarmStatus; +import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; @@ -38,8 +44,6 @@ import java.util.List; import java.util.Optional; import java.util.UUID; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - @DaoSqlTest public class AlarmEdgeTest extends AbstractEdgeTest { @@ -77,66 +81,48 @@ public class AlarmEdgeTest extends AbstractEdgeTest { } @Test - public void testAlarms() throws Exception { - // create alarm - Device device = findDeviceByName("Edge Device 1"); - Alarm alarm = new Alarm(); - alarm.setOriginator(device.getId()); - alarm.setType("alarm"); - alarm.setSeverity(AlarmSeverity.CRITICAL); - edgeImitator.expectMessageAmount(1); - Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); - Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; - Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); - Assert.assertNotNull(alarmMsg); - Assert.assertEquals(savedAlarm, alarmMsg); - Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); + public void testSendAlarmCommentToCloud() throws Exception { + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); - // ack alarm - edgeImitator.expectMessageAmount(1); - doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack"); - Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; - Assert.assertEquals(UpdateMsgType.ALARM_ACK_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); - alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); - Assert.assertNotNull(alarmMsg); - Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); - Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); - Assert.assertEquals(AlarmStatus.ACTIVE_ACK, alarmMsg.getStatus()); + Alarm alarm = buildAlarmForUplinkMsg(device.getId()); - // clear alarm - edgeImitator.expectMessageAmount(1); - doPost("/api/alarm/" + savedAlarm.getUuidId() + "/clear"); - Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; - Assert.assertEquals(UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); - alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); - Assert.assertNotNull(alarmMsg); - Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); - Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); - Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus()); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + AlarmUpdateMsg.Builder alarmUpdateMgBuilder = AlarmUpdateMsg.newBuilder(); + alarmUpdateMgBuilder.setIdMSB(alarm.getUuidId().getMostSignificantBits()); + alarmUpdateMgBuilder.setIdLSB(alarm.getUuidId().getLeastSignificantBits()); + alarmUpdateMgBuilder.setEntity(JacksonUtil.toString(alarm)); + testAutoGeneratedCodeByProtobuf(alarmUpdateMgBuilder); + uplinkMsgBuilder.addAlarmUpdateMsg(alarmUpdateMgBuilder.build()); - // delete alarm - edgeImitator.expectMessageAmount(1); - doDelete("/api/alarm/" + savedAlarm.getUuidId()) - .andExpect(status().isOk()); - Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; - Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); - alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); - Assert.assertNotNull(alarmMsg); - Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); - Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); - Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus()); + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + AlarmComment alarmComment = buildAlarmCommentForUplinkMsg(alarm.getId()); + + uplinkMsgBuilder = UplinkMsg.newBuilder(); + AlarmCommentUpdateMsg.Builder alarmCommentUpdateMgBuilder = AlarmCommentUpdateMsg.newBuilder(); + alarmCommentUpdateMgBuilder.setEntity(JacksonUtil.toString(alarmComment)); + alarmCommentUpdateMgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + testAutoGeneratedCodeByProtobuf(alarmCommentUpdateMgBuilder); + uplinkMsgBuilder.addAlarmCommentUpdateMsg(alarmCommentUpdateMgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + PageData pageData = doGetTyped("/api/alarm/" + alarmComment.getAlarmId().getId() + "/comment" + "?page=0&pageSize=1", new TypeReference<>() {}); + Assert.assertNotNull("Found pageData is null", pageData); + Assert.assertNotEquals("Expected alarms are not found!", 0, pageData.getTotalElements()); + + Assert.assertNotNull(pageData.getData().get(0)); + AlarmCommentInfo alarmInfo = pageData.getData().get(0); + Assert.assertEquals(alarm.getId(), alarmInfo.getAlarmId()); + Assert.assertEquals(alarmComment.getAlarmId(), alarmInfo.getAlarmId()); } private Alarm buildAlarmForUplinkMsg(DeviceId deviceId) { @@ -148,4 +134,16 @@ public class AlarmEdgeTest extends AbstractEdgeTest { alarm.setSeverity(AlarmSeverity.CRITICAL); return alarm; } + + private AlarmComment buildAlarmCommentForUplinkMsg(AlarmId alarmId) { + UUID uuid = Uuids.timeBased(); + AlarmComment alarmComment = new AlarmComment(); + alarmComment.setAlarmId(alarmId); + alarmComment.setType(AlarmCommentType.OTHER); + alarmComment.setUserId(tenantAdminUserId); + alarmComment.setId(new AlarmCommentId(uuid)); + alarmComment.setComment(new TextNode("AlarmComment")); + alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid)); + return alarmComment; + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index e39d93946c..680f744c7b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -1,5 +1,5 @@ /** - * Copyright © 2016-2023 The Thingsboard Authors + * Copyright © 2016-2024 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java b/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java index 19c3fc8f4f..8d46a46781 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.page; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -64,8 +65,8 @@ public abstract class BasePageDataIterable implements Iterable, Iterator pageData = fetchPageData(link); currentIdx = 0; - currentItems = pageData.getData(); - hasNextPack = pageData.hasNext(); + currentItems = pageData != null ? pageData.getData() : new ArrayList<>(); + hasNextPack = pageData != null && pageData.hasNext(); nextPackLink = link.nextPageLink(); } From e84a1f066201b83579bba6732b43126e7585569a Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 10 Jan 2024 15:00:15 +0200 Subject: [PATCH 3/7] Add comment types to convertEntityEventToDownlink --- .../thingsboard/server/service/edge/rpc/EdgeGrpcSession.java | 3 +++ .../service/edge/rpc/processor/alarm/BaseAlarmProcessor.java | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 14572bc871..5e6f7a263e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -531,6 +531,9 @@ public final class EdgeGrpcSession implements Closeable { case RPC_CALL: case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: + case ADDED_COMMENT: + case UPDATED_COMMENT: + case DELETED_COMMENT: downlinkMsg = convertEntityEventToDownlink(edgeEvent); log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg); break; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index e82c2aa2d0..11cd2ab56c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -107,6 +107,10 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { throw new RuntimeException("[{" + tenantId + "}] alarmCommentUpdateMsg {" + alarmCommentUpdateMsg + "} cannot be converted to alarm comment"); } try { + Alarm alarm = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId())); + if (alarm == null) { + return Futures.immediateFuture(null); + } switch (alarmCommentUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: alarmCommentDao.createAlarmComment(tenantId, alarmComment); From 1478383db6c51436b54013c03ee8fd8452558274 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Thu, 11 Jan 2024 11:27:23 +0200 Subject: [PATCH 4/7] Fix tests. Improve TbMsgPushNode toCloud and toEdge description --- .../server/controller/AlarmControllerTest.java | 6 +++--- .../rule/engine/edge/AbstractTbMsgPushNode.java | 6 +++--- .../rule/engine/edge/TbMsgPushToCloudNode.java | 12 +++--------- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 2 ++ 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java index f5c1fbdae9..1415e38d7e 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java @@ -110,7 +110,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Alarm alarm = createAlarm(TEST_ALARM_TYPE); - testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(), tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, ActionType.ADDED); } @@ -122,7 +122,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Alarm alarm = createAlarm(TEST_ALARM_TYPE); - testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(), tenantId, customerId, tenantAdminUserId, TENANT_ADMIN_EMAIL, ActionType.ADDED); } @@ -772,7 +772,7 @@ public class AlarmControllerTest extends AbstractControllerTest { alarm = doPost("/api/alarm", alarm, Alarm.class); Assert.assertNotNull("Saved alarm is null!", alarm); - testNotifyEntityNeverMsgToEdgeServiceOneTime(alarm, alarm.getId(), tenantId, ActionType.ADDED); + testNotifyEntityNever(alarm.getId(), alarm); resetTokens(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 2349fbdbb6..6ed8a33c2d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -27,6 +27,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.TenantId; @@ -147,9 +148,8 @@ public abstract class AbstractTbMsgPushNodeSupports next originator types:" + - "
DEVICE" + - "
ASSET" + - "
ENTITY_VIEW" + - "
DASHBOARD" + - "
TENANT" + - "
CUSTOMER" + - "
EDGE

" + - "As well node supports next message types:" + + "Supports next message types:" + "
POST_TELEMETRY_REQUEST" + "
POST_ATTRIBUTES_REQUEST" + "
ATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + + "
COMMENT_CREATED" + + "
COMMENT_UPDATED" + "Message will be routed via Failure route if node was not able to save cloud event to database or unsupported originator type/message type arrived. " + "In case successful storage cloud event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index d9d634c03d..0c6461d3a7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -60,6 +60,8 @@ import java.util.UUID; "
ATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + + "
COMMENT_CREATED" + + "
COMMENT_UPDATED" + "Message will be routed via Failure route if node was not able to save edge event to database or unsupported message type arrived. " + "In case successful storage edge event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, From 43e6a402904548766e9336be5c769a60e0de2284 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 15 Jan 2024 15:51:05 +0200 Subject: [PATCH 5/7] Send alarm comment without rule node pushToEdge --- .../edge/EdgeEventSourcingListener.java | 44 ++++++-- .../processor/alarm/AlarmEdgeProcessor.java | 45 +++----- .../server/edge/AlarmEdgeTest.java | 104 ++++++++++++++++++ .../dao/alarm/BaseAlarmCommentService.java | 14 ++- .../engine/edge/AbstractTbMsgPushNode.java | 19 +--- .../engine/edge/TbMsgPushToCloudNode.java | 7 -- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 20 +--- 7 files changed, 169 insertions(+), 84 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 5883d04fe6..89241f9a21 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -79,7 +79,12 @@ public class EdgeEventSourcingListener { return; } log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); - EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + boolean isAdded = Boolean.TRUE.equals(event.getAdded()); + EdgeEventActionType action = isAdded ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + if (event.getEntity() instanceof AlarmComment) { + processAlarmCommentEvent(event, isAdded); + return; + } tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), null, null, action, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { @@ -87,22 +92,33 @@ public class EdgeEventSourcingListener { } } + private void processAlarmCommentEvent(SaveEntityEvent event, boolean added) { + EdgeEventActionType action = added ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT; + tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), + JacksonUtil.toString(event.getEntity()), EdgeEventType.ALARM_COMMENT, action, edgeSynchronizationManager.getEdgeId().get()); + } + @TransactionalEventListener(fallbackExecution = true) public void handleEvent(DeleteEntityEvent event) { try { log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); - EdgeEventType type = null; - if (event.getEntity() instanceof AlarmComment) { - type = EdgeEventType.ALARM_COMMENT; - } + EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity()); + EdgeEventActionType actionType = getEdgeEventActionTypeForEntityEvent(event.getEntity()); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - JacksonUtil.toString(event.getEntity()), type, EdgeEventActionType.DELETED, + JacksonUtil.toString(event.getEntity()), type, actionType, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e); } } + private EdgeEventActionType getEdgeEventActionTypeForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return EdgeEventActionType.DELETED_COMMENT; + } + return EdgeEventActionType.DELETED; + } + @TransactionalEventListener(fallbackExecution = true) public void handleEvent(ActionEntityEvent event) { try { @@ -154,7 +170,7 @@ public class EdgeEventSourcingListener { cleanUpUserAdditionalInfo(user); return !user.equals(oldUser); } - } else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm || entity instanceof AlarmComment) { + } else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm) { return false; } // Default: If the entity doesn't match any of the conditions, consider it as valid. @@ -177,4 +193,18 @@ public class EdgeEventSourcingListener { } } } + + private EdgeEventType getEdgeEventTypeForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return EdgeEventType.ALARM_COMMENT; + } + return null; + } + + private String getBodyMsgForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return JacksonUtil.toString(entity); + } + return null; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index e6889a1e6f..f83cbbba9c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -26,15 +26,11 @@ import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageDataIterable; import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; @@ -45,7 +41,6 @@ import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstru import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.UUID; @Slf4j @@ -88,29 +83,22 @@ public abstract class AlarmEdgeProcessor extends BaseAlarmProcessor implements A @Override public DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) { - AlarmCommentId alarmCommentId = new AlarmCommentId(edgeEvent.getEntityId()); UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); - AlarmComment alarmComment; switch (edgeEvent.getAction()) { case ADDED_COMMENT: case UPDATED_COMMENT: - alarmComment = alarmCommentService.findAlarmCommentById(edgeEvent.getTenantId(), alarmCommentId); - break; case DELETED_COMMENT: - alarmComment = JacksonUtil.convertValue(edgeEvent.getBody(), AlarmComment.class); - break; + AlarmComment alarmComment = JacksonUtil.convertValue(edgeEvent.getBody(), AlarmComment.class); + if (alarmComment != null) { + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory + .getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment)) + .build(); + } default: return null; } - return Optional.ofNullable(alarmComment).map(comment -> buildAlarmCommentDownlinkMsg(msgType, comment, edgeVersion)).orElse(null); - } - - private DownlinkMsg buildAlarmCommentDownlinkMsg(UpdateMsgType msgType, AlarmComment alarmComment, EdgeVersion edgeVersion) { - return DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory - .getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment)) - .build(); } public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { @@ -145,17 +133,14 @@ public abstract class AlarmEdgeProcessor extends BaseAlarmProcessor implements A EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); - if (EdgeEventActionType.DELETED.equals(actionType)) { - AlarmComment deletedAlarmComment = JacksonUtil.fromString(edgeNotificationMsg.getBody(), AlarmComment.class); - if (deletedAlarmComment == null) { - return Futures.immediateFuture(null); - } - Alarm alarmById = alarmService.findAlarmById(tenantId, new AlarmId(deletedAlarmComment.getAlarmId().getId())); - List> delFutures = pushEventToAllRelatedEdges(tenantId, alarmById.getOriginator(), - alarmId, actionType, JacksonUtil.valueToTree(deletedAlarmComment), originatorEdgeId, EdgeEventType.ALARM_COMMENT); - return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); + AlarmComment alarmComment = JacksonUtil.fromString(edgeNotificationMsg.getBody(), AlarmComment.class); + if (alarmComment == null) { + return Futures.immediateFuture(null); } - return Futures.immediateFuture(null); + Alarm alarmById = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId())); + List> delFutures = pushEventToAllRelatedEdges(tenantId, alarmById.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(alarmComment), originatorEdgeId, EdgeEventType.ALARM_COMMENT); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); } private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, diff --git a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java index 6e10507f1e..acd8cb1444 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java @@ -18,6 +18,7 @@ package org.thingsboard.server.edge; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.node.TextNode; +import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; import org.thingsboard.common.util.JacksonUtil; @@ -44,6 +45,8 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + @DaoSqlTest public class AlarmEdgeTest extends AbstractEdgeTest { @@ -80,6 +83,60 @@ public class AlarmEdgeTest extends AbstractEdgeTest { Assert.assertEquals(AlarmSeverity.CRITICAL, alarmInfo.getSeverity()); } + @Test + public void testAlarms() throws Exception { + // create alarm + Device device = findDeviceByName("Edge Device 1"); + Alarm alarm = new Alarm(); + alarm.setOriginator(device.getId()); + alarm.setType("alarm"); + alarm.setSeverity(AlarmSeverity.CRITICAL); + Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); + + // ack alarm + edgeImitator.expectMessageAmount(1); + doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack"); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); + AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ALARM_ACK_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); + Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); + Assert.assertNotNull(alarmMsg); + Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); + Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); + Assert.assertEquals(AlarmStatus.ACTIVE_ACK, alarmMsg.getStatus()); + + // clear alarm + edgeImitator.expectMessageAmount(1); + doPost("/api/alarm/" + savedAlarm.getUuidId() + "/clear"); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); + alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); + alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); + Assert.assertNotNull(alarmMsg); + Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); + Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); + Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus()); + + // delete alarm + edgeImitator.expectMessageAmount(1); + doDelete("/api/alarm/" + savedAlarm.getUuidId()) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); + alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); + alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); + Assert.assertNotNull(alarmMsg); + Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); + Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); + Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus()); + } + @Test public void testSendAlarmCommentToCloud() throws Exception { Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); @@ -125,6 +182,53 @@ public class AlarmEdgeTest extends AbstractEdgeTest { Assert.assertEquals(alarmComment.getAlarmId(), alarmInfo.getAlarmId()); } + @Test + public void testAlarmComments() throws Exception { + Device device = findDeviceByName("Edge Device 1"); + Alarm alarm = new Alarm(); + alarm.setOriginator(device.getId()); + alarm.setType("alarm"); + alarm.setSeverity(AlarmSeverity.MINOR); + Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); + + // create alarm comment + edgeImitator.expectMessageAmount(1); + AlarmComment alarmComment = new AlarmComment(); + alarmComment.setComment(new TextNode("Test")); + alarmComment.setAlarmId(savedAlarm.getId()); + alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + AlarmCommentUpdateMsg alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + AlarmComment alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); + Assert.assertEquals(alarmComment, alarmCommentMsg); + + // update alarm comment + edgeImitator.expectMessageAmount(1); + alarmComment.setComment(JacksonUtil.newObjectNode().set("text", new TextNode("Updated comment"))); + alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); + Assert.assertEquals(alarmComment, alarmCommentMsg); + + // delete alarm + edgeImitator.expectMessageAmount(1); + doDelete("/api/alarm/" + savedAlarm.getUuidId() + "/comment/" + alarmComment.getUuidId()) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); + } + private Alarm buildAlarmForUplinkMsg(DeviceId deviceId) { Alarm alarm = new Alarm(); alarm.setId(new AlarmId(UUID.randomUUID())); diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index b60acc6550..042f0f693b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; +import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.service.DataValidator; import java.util.UUID; @@ -51,11 +52,18 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al @Override public AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment) { alarmCommentDataValidator.validate(alarmComment, c -> tenantId); - if (alarmComment.getId() == null) { - return createAlarmComment(tenantId, alarmComment); + boolean isCreated = alarmComment.getId() == null; + AlarmComment result; + if (isCreated) { + result = createAlarmComment(tenantId, alarmComment); } else { - return updateAlarmComment(tenantId, alarmComment); + result = updateAlarmComment(tenantId, alarmComment); } + if (result != null) { + eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entity(result) + .entityId(result.getAlarmId()).added(isCreated).build()); + } + return result; } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java index 6ed8a33c2d..31fda250d0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/AbstractTbMsgPushNode.java @@ -28,7 +28,6 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.alarm.Alarm; -import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbMsg; @@ -42,8 +41,6 @@ import static org.thingsboard.server.common.data.msg.TbMsgType.ACTIVITY_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_UPDATED; -import static org.thingsboard.server.common.data.msg.TbMsgType.COMMENT_CREATED; -import static org.thingsboard.server.common.data.msg.TbMsgType.COMMENT_UPDATED; import static org.thingsboard.server.common.data.msg.TbMsgType.CONNECT_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.DISCONNECT_EVENT; import static org.thingsboard.server.common.data.msg.TbMsgType.INACTIVITY_EVENT; @@ -83,9 +80,6 @@ public abstract class AbstractTbMsgPushNode metadata = msg.getMetaData().getData(); EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msg); @@ -139,8 +133,6 @@ public abstract class AbstractTbMsgPushNode getConfigClazz(); @@ -152,11 +144,6 @@ public abstract class AbstractTbMsgPushNode metadata) { String scope = metadata.get(SCOPE); if (StringUtils.isEmpty(scope)) { @@ -179,10 +166,6 @@ public abstract class AbstractTbMsgPushNodeATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + - "
COMMENT_CREATED" + - "
COMMENT_UPDATED" + "Message will be routed via Failure route if node was not able to save cloud event to database or unsupported originator type/message type arrived. " + "In case successful storage cloud event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, @@ -72,11 +70,6 @@ public class TbMsgPushToCloudNode extends AbstractTbMsgPushNodeATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + - "
COMMENT_CREATED" + - "
COMMENT_UPDATED" + "Message will be routed via Failure route if node was not able to save edge event to database or unsupported message type arrived. " + "In case successful storage edge event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, @@ -95,11 +90,6 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode> futures = new ArrayList<>(); - EntityId finalOriginatorId = originatorId; PageDataIterableByTenantIdEntityId edgeIds = new PageDataIterableByTenantIdEntityId<>( - ctx.getEdgeService()::findRelatedEdgeIdsByEntityId, ctx.getTenantId(), finalOriginatorId, DEFAULT_PAGE_SIZE); + ctx.getEdgeService()::findRelatedEdgeIdsByEntityId, ctx.getTenantId(), msg.getOriginator(), DEFAULT_PAGE_SIZE); for (EdgeId edgeId : edgeIds) { EdgeEvent edgeEvent = buildEvent(msg, ctx); futures.add(notifyEdge(ctx, edgeEvent, edgeId)); From 67fe8067b73440dbf99c93fa79eaa30190a0bfcd Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 16 Jan 2024 09:24:45 +0200 Subject: [PATCH 6/7] Refactor SaveEntityEvent to make it more generic in EdgeEventSourcingListener --- .../edge/EdgeEventSourcingListener.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 89241f9a21..3ed90ee431 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -80,24 +80,16 @@ public class EdgeEventSourcingListener { } log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); boolean isAdded = Boolean.TRUE.equals(event.getAdded()); - EdgeEventActionType action = isAdded ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; - if (event.getEntity() instanceof AlarmComment) { - processAlarmCommentEvent(event, isAdded); - return; - } + String body = getBodyMsgForEntityEvent(event.getEntity()); + EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity()); + EdgeEventActionType action = getActionForEntityEvent(event.getEntity(), isAdded); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - null, null, action, edgeSynchronizationManager.getEdgeId().get()); + body, type, action, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e); } } - private void processAlarmCommentEvent(SaveEntityEvent event, boolean added) { - EdgeEventActionType action = added ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT; - tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - JacksonUtil.toString(event.getEntity()), EdgeEventType.ALARM_COMMENT, action, edgeSynchronizationManager.getEdgeId().get()); - } - @TransactionalEventListener(fallbackExecution = true) public void handleEvent(DeleteEntityEvent event) { try { @@ -207,4 +199,11 @@ public class EdgeEventSourcingListener { } return null; } + + private EdgeEventActionType getActionForEntityEvent(Object entity, boolean isAdded) { + if (entity instanceof AlarmComment) { + return isAdded ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT; + } + return isAdded ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + } } From 54ee0677ad7129b1644eaa10dcb8d8756c5dbb95 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 16 Jan 2024 09:39:19 +0200 Subject: [PATCH 7/7] Renaming SaveEntityEvent variable from added to created. Fix test --- .../server/service/edge/EdgeEventSourcingListener.java | 10 +++++----- .../org/thingsboard/server/edge/AlarmEdgeTest.java | 2 ++ .../server/dao/alarm/BaseAlarmCommentService.java | 2 +- .../thingsboard/server/dao/alarm/BaseAlarmService.java | 2 +- .../server/dao/asset/AssetProfileServiceImpl.java | 2 +- .../thingsboard/server/dao/asset/BaseAssetService.java | 2 +- .../server/dao/customer/CustomerServiceImpl.java | 2 +- .../server/dao/dashboard/DashboardServiceImpl.java | 2 +- .../server/dao/device/DeviceProfileServiceImpl.java | 2 +- .../server/dao/device/DeviceServiceImpl.java | 3 +-- .../server/dao/entityview/EntityViewServiceImpl.java | 2 +- .../server/dao/eventsourcing/SaveEntityEvent.java | 2 +- .../server/dao/ota/BaseOtaPackageService.java | 4 ++-- .../thingsboard/server/dao/queue/BaseQueueService.java | 2 +- .../server/dao/resource/BaseResourceService.java | 2 +- .../server/dao/rule/BaseRuleChainService.java | 2 +- .../server/dao/tenant/TenantProfileServiceImpl.java | 2 +- .../server/dao/tenant/TenantServiceImpl.java | 2 +- .../thingsboard/server/dao/user/UserServiceImpl.java | 2 +- .../server/dao/widget/WidgetTypeServiceImpl.java | 4 ++-- .../server/dao/widget/WidgetsBundleServiceImpl.java | 2 +- 21 files changed, 28 insertions(+), 27 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 3ed90ee431..ca7048c7e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -79,10 +79,10 @@ public class EdgeEventSourcingListener { return; } log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); - boolean isAdded = Boolean.TRUE.equals(event.getAdded()); + boolean isCreated = Boolean.TRUE.equals(event.getCreated()); String body = getBodyMsgForEntityEvent(event.getEntity()); EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity()); - EdgeEventActionType action = getActionForEntityEvent(event.getEntity(), isAdded); + EdgeEventActionType action = getActionForEntityEvent(event.getEntity(), isCreated); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), body, type, action, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { @@ -200,10 +200,10 @@ public class EdgeEventSourcingListener { return null; } - private EdgeEventActionType getActionForEntityEvent(Object entity, boolean isAdded) { + private EdgeEventActionType getActionForEntityEvent(Object entity, boolean isCreated) { if (entity instanceof AlarmComment) { - return isAdded ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT; + return isCreated ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT; } - return isAdded ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + return isCreated ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; } } diff --git a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java index acd8cb1444..239cb87ab4 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java @@ -92,6 +92,7 @@ public class AlarmEdgeTest extends AbstractEdgeTest { alarm.setType("alarm"); alarm.setSeverity(AlarmSeverity.CRITICAL); Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); + edgeImitator.ignoreType(AlarmCommentUpdateMsg.class); // ack alarm edgeImitator.expectMessageAmount(1); @@ -135,6 +136,7 @@ public class AlarmEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus()); + edgeImitator.allowIgnoredTypes(); } @Test diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 042f0f693b..5bdd9803b2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -61,7 +61,7 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al } if (result != null) { eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entity(result) - .entityId(result.getAlarmId()).added(isCreated).build()); + .entityId(result.getAlarmId()).created(isCreated).build()); } return result; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index 2a42544bec..72e1b9c7bf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -135,7 +135,7 @@ public class BaseAlarmService extends AbstractCachedEntityService { private final T entity; private final T oldEntity; private final EntityId entityId; - private final Boolean added; + private final Boolean created; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java b/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java index 24539828c9..53a585c358 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java @@ -84,7 +84,7 @@ public class BaseOtaPackageService extends AbstractCachedEntityService