From 1256f09e7e2e6135ff6265002a39dcaf90ec7c91 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Wed, 10 Jan 2024 10:19:07 +0200 Subject: [PATCH] Introduce edge support for alarm comment --- .../edge/DefaultEdgeNotificationService.java | 3 + .../edge/EdgeEventSourcingListener.java | 16 +- .../service/edge/rpc/EdgeGrpcSession.java | 9 ++ .../alarm/AlarmMsgConstructor.java | 4 + .../alarm/AlarmMsgConstructorV1.java | 2 +- .../alarm/AlarmMsgConstructorV2.java | 2 +- .../alarm/BaseAlarmMsgConstructor.java | 29 ++++ .../edge/rpc/processor/BaseEdgeProcessor.java | 49 +++---- .../processor/alarm/AlarmEdgeProcessor.java | 138 ++++++++++++------ .../rpc/processor/alarm/AlarmProcessor.java | 5 + .../processor/alarm/BaseAlarmProcessor.java | 32 ++++ .../DefaultTbNotificationEntityService.java | 38 +---- .../controller/AbstractNotifyEntityTest.java | 12 +- .../server/edge/imitator/EdgeImitator.java | 6 + .../rpc/processor/BaseEdgeProcessorTest.java | 4 + .../server/dao/alarm/AlarmCommentService.java | 1 + .../server/common/data/EdgeUtils.java | 13 ++ .../common/data/edge/EdgeEventActionType.java | 59 +++++--- .../common/data/edge/EdgeEventType.java | 1 + common/edge-api/src/main/proto/edge.proto | 7 + .../dao/alarm/BaseAlarmCommentService.java | 7 +- .../server/dao/alarm/BaseAlarmService.java | 2 +- .../engine/edge/AbstractTbMsgPushNode.java | 52 +++---- .../engine/edge/TbMsgPushToCloudNode.java | 5 + .../rule/engine/edge/TbMsgPushToEdgeNode.java | 52 ++++--- 25 files changed, 346 insertions(+), 202 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index b3aba37632..03fb982d6d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -216,6 +216,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { case ALARM: alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); break; + case ALARM_COMMENT: + alarmProcessor.processAlarmCommentNotification(tenantId, edgeNotificationMsg); + break; case RELATION: relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); break; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 351f42d1fc..5883d04fe6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -23,10 +23,12 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -43,8 +45,6 @@ import org.thingsboard.server.dao.user.UserServiceImpl; import javax.annotation.PostConstruct; -import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; - /** * This event listener does not support async event processing because relay on ThreadLocal * Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener @@ -91,8 +91,12 @@ public class EdgeEventSourcingListener { public void handleEvent(DeleteEntityEvent event) { try { log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); + EdgeEventType type = null; + if (event.getEntity() instanceof AlarmComment) { + type = EdgeEventType.ALARM_COMMENT; + } tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED, + JacksonUtil.toString(event.getEntity()), type, EdgeEventActionType.DELETED, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e); @@ -104,7 +108,7 @@ public class EdgeEventSourcingListener { try { log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), - event.getBody(), null, edgeTypeByActionType(event.getActionType()), + event.getBody(), null, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()), edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e); @@ -125,7 +129,7 @@ public class EdgeEventSourcingListener { } log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, - JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()), + JacksonUtil.toString(relation), EdgeEventType.RELATION, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()), edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e); @@ -150,7 +154,7 @@ public class EdgeEventSourcingListener { cleanUpUserAdditionalInfo(user); return !user.equals(oldUser); } - } else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm) { + } else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm || entity instanceof AlarmComment) { return false; } // Default: If the entity doesn't match any of the conditions, consider it as valid. diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index f1f473f976..14572bc871 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -644,6 +645,8 @@ public final class EdgeGrpcSession implements Closeable { return ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, this.edgeVersion); case ALARM: return ctx.getAlarmProcessor().convertAlarmEventToDownlink(edgeEvent, this.edgeVersion); + case ALARM_COMMENT: + return ctx.getAlarmProcessor().convertAlarmCommentEventToDownlink(edgeEvent, this.edgeVersion); case USER: return ctx.getUserProcessor().convertUserEventToDownlink(edgeEvent, this.edgeVersion); case RELATION: @@ -714,6 +717,12 @@ public final class EdgeGrpcSession implements Closeable { .processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg)); } } + if (uplinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : uplinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(((AlarmProcessor) ctx.getAlarmEdgeProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) + .processAlarmCommentMsgFromEdge(edge.getTenantId(), edge.getId(), alarmCommentUpdateMsg)); + } + } if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) { for (EntityViewUpdateMsg entityViewUpdateMsg : uplinkMsg.getEntityViewUpdateMsgList()) { result.add(((EntityViewProcessor) ctx.getEntityViewProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java index 81606434b5..b5a0f20d6b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java @@ -16,6 +16,8 @@ package org.thingsboard.server.service.edge.rpc.constructor.alarm; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; @@ -23,4 +25,6 @@ import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; public interface AlarmMsgConstructor extends MsgConstructor { AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName); + + AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java index 5abba029de..81a489671f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV1 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV1 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java index d9768b8f9e..143a41b73c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV2 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV2 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java new file mode 100644 index 0000000000..c9af86e8cc --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor.alarm; + +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; + +public abstract class BaseAlarmMsgConstructor implements AlarmMsgConstructor { + + @Override + public AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment) { + return AlarmCommentUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(alarmComment)).build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 6f7474b15d..256f191215 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -51,6 +51,7 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; @@ -59,6 +60,7 @@ import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -145,6 +147,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected AlarmService alarmService; + @Autowired + protected AlarmCommentService alarmCommentService; + @Autowired protected DeviceService deviceService; @@ -350,10 +355,13 @@ public abstract class BaseEdgeProcessor { case ALARM_ASSIGNED: case ALARM_UNASSIGNED: case CREDENTIALS_REQUEST: + case ADDED_COMMENT: + case UPDATED_COMMENT: return true; } switch (type) { case ALARM: + case ALARM_COMMENT: case RULE_CHAIN: case RULE_CHAIN_METADATA: case USER: @@ -441,14 +449,17 @@ public abstract class BaseEdgeProcessor { case CREDENTIALS_UPDATED: case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: + case UPDATED_COMMENT: return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE; case ADDED: case ASSIGNED_TO_EDGE: case RELATION_ADD_OR_UPDATE: + case ADDED_COMMENT: return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE; case DELETED: case UNASSIGNED_FROM_EDGE: case RELATION_DELETED: + case DELETED_COMMENT: return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; case ALARM_ACK: return UpdateMsgType.ALARM_ACK_RPC_MESSAGE; @@ -517,22 +528,14 @@ public abstract class BaseEdgeProcessor { private ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); } - } while (pageData != null && pageData.hasNext()); + } return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } @@ -715,19 +718,13 @@ public abstract class BaseEdgeProcessor { } private boolean isEntityNotAssignedToEdge(TenantId tenantId, EntityId entityId, EdgeId edgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - if (pageData.getData().contains(edgeId)) { - return false; - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId edgeId1 : edgeIds) { + if (edgeId1.equals(edgeId)) { + return false; } - } while (pageData != null && pageData.hasNext()); + } return true; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index ea48032b69..e6889a1e6f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -22,22 +22,30 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageDataIterable; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructor; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; @Slf4j @@ -67,58 +75,100 @@ public abstract class AlarmEdgeProcessor extends BaseAlarmProcessor implements A return null; } + @Override + public ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsgFromEdge [{}]", tenantId, alarmCommentUpdateMsg); + try { + edgeSynchronizationManager.getEdgeId().set(edgeId); + return processAlarmCommentMsg(tenantId, alarmCommentUpdateMsg); + } finally { + edgeSynchronizationManager.getEdgeId().remove(); + } + } + + @Override + public DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) { + AlarmCommentId alarmCommentId = new AlarmCommentId(edgeEvent.getEntityId()); + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + AlarmComment alarmComment; + switch (edgeEvent.getAction()) { + case ADDED_COMMENT: + case UPDATED_COMMENT: + alarmComment = alarmCommentService.findAlarmCommentById(edgeEvent.getTenantId(), alarmCommentId); + break; + case DELETED_COMMENT: + alarmComment = JacksonUtil.convertValue(edgeEvent.getBody(), AlarmComment.class); + break; + default: + return null; + } + return Optional.ofNullable(alarmComment).map(comment -> buildAlarmCommentDownlinkMsg(msgType, comment, edgeVersion)).orElse(null); + } + + private DownlinkMsg buildAlarmCommentDownlinkMsg(UpdateMsgType msgType, AlarmComment alarmComment, EdgeVersion edgeVersion) { + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory + .getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment)) + .build(); + } + public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); - switch (actionType) { - case DELETED: - Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); - if (deletedAlarm == null) { - return Futures.immediateFuture(null); - } - List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), - alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId); - return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); - default: - ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); - return Futures.transformAsync(alarmFuture, alarm -> { - if (alarm == null) { - return Futures.immediateFuture(null); - } - EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); - if (type == null) { - return Futures.immediateFuture(null); - } - List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), - alarmId, actionType, null, originatorEdgeId); - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); - }, dbCallbackExecutorService); + if (EdgeEventActionType.DELETED.equals(actionType)) { + Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); + if (deletedAlarm == null) { + return Futures.immediateFuture(null); + } + List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); } + ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); + return Futures.transformAsync(alarmFuture, alarm -> { + if (alarm == null) { + return Futures.immediateFuture(null); + } + EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); + if (type == null) { + return Futures.immediateFuture(null); + } + List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), + alarmId, actionType, null, originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + }, dbCallbackExecutorService); } - private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, - relatedEdgeId, - EdgeEventType.ALARM, - actionType, - alarmId, - body)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + public ListenableFuture processAlarmCommentNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); + if (EdgeEventActionType.DELETED.equals(actionType)) { + AlarmComment deletedAlarmComment = JacksonUtil.fromString(edgeNotificationMsg.getBody(), AlarmComment.class); + if (deletedAlarmComment == null) { + return Futures.immediateFuture(null); } - } while (pageData != null && pageData.hasNext()); + Alarm alarmById = alarmService.findAlarmById(tenantId, new AlarmId(deletedAlarmComment.getAlarmId().getId())); + List> delFutures = pushEventToAllRelatedEdges(tenantId, alarmById.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarmComment), originatorEdgeId, EdgeEventType.ALARM_COMMENT); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); + } + return Futures.immediateFuture(null); + } + + private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, + EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId, + EdgeEventType edgeEventType) { + List> futures = new ArrayList<>(); + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, originatorId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, edgeEventType, actionType, alarmId, body)); + } + } return futures; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java index 68a72bf01e..938269462d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; @@ -29,4 +30,8 @@ public interface AlarmProcessor extends EdgeProcessor { ListenableFuture processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg); DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); + + ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg); + + DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 546d6e63db..6d04db5b75 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -23,6 +23,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCreateOrUpdateActiveRequest; import org.thingsboard.server.common.data.alarm.AlarmUpdateRequest; import org.thingsboard.server.common.data.asset.Asset; @@ -33,6 +34,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; @@ -93,6 +95,36 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { return Futures.immediateFuture(null); } + public ListenableFuture processAlarmCommentMsg(TenantId tenantId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsg [{}]", tenantId, alarmCommentUpdateMsg); + AlarmComment alarmComment = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + if (alarmComment == null) { + throw new RuntimeException("[{" + tenantId + "}] alarmCommentUpdateMsg {" + alarmCommentUpdateMsg + "} cannot be converted to alarm comment"); + } + try { + switch (alarmCommentUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + alarmCommentService.createOrUpdateAlarmComment(tenantId, alarmComment); + break; + case ENTITY_DELETED_RPC_MESSAGE: + AlarmComment alarmCommentToDelete = alarmCommentService.findAlarmCommentById(tenantId, alarmComment.getId()); + if (alarmCommentToDelete != null) { + alarmCommentService.saveAlarmComment(tenantId, alarmCommentToDelete); + } + break; + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(alarmCommentUpdateMsg.getMsgType()); + } + } catch (Exception e) { + log.error("[{}] Failed to process alarm comment update msg [{}]", tenantId, alarmCommentUpdateMsg, e); + return Futures.immediateFailedFuture(e); + } + return Futures.immediateFuture(null); + } + + protected abstract EntityId getAlarmOriginatorFromMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg); protected abstract Alarm constructAlarmFromUpdateMsg(TenantId tenantId, AlarmId alarmId, EntityId originatorId, AlarmUpdateMsg alarmUpdateMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index 321cc69604..3a87fff506 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.HasName; @@ -27,7 +26,6 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -40,6 +38,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.service.action.EntityActionService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -174,39 +173,4 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS metaData.putValue("assignedFromTenantName", tenant.getName()); return metaData; } - - public static EdgeEventActionType edgeTypeByActionType(ActionType actionType) { - switch (actionType) { - case ADDED: - return EdgeEventActionType.ADDED; - case UPDATED: - return EdgeEventActionType.UPDATED; - case ALARM_ACK: - return EdgeEventActionType.ALARM_ACK; - case ALARM_CLEAR: - return EdgeEventActionType.ALARM_CLEAR; - case ALARM_ASSIGNED: - return EdgeEventActionType.ALARM_ASSIGNED; - case ALARM_UNASSIGNED: - return EdgeEventActionType.ALARM_UNASSIGNED; - case DELETED: - return EdgeEventActionType.DELETED; - case RELATION_ADD_OR_UPDATE: - return EdgeEventActionType.RELATION_ADD_OR_UPDATE; - case RELATION_DELETED: - return EdgeEventActionType.RELATION_DELETED; - case ASSIGNED_TO_CUSTOMER: - return EdgeEventActionType.ASSIGNED_TO_CUSTOMER; - case UNASSIGNED_FROM_CUSTOMER: - return EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER; - case ASSIGNED_TO_EDGE: - return EdgeEventActionType.ASSIGNED_TO_EDGE; - case UNASSIGNED_FROM_EDGE: - return EdgeEventActionType.UNASSIGNED_FROM_EDGE; - case CREDENTIALS_UPDATED: - return EdgeEventActionType.CREDENTIALS_UPDATED; - default: - return null; - } - } } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java index 429d0b765f..44302f08ba 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java @@ -20,6 +20,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; @@ -47,7 +48,6 @@ import java.util.stream.Collectors; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; -import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; @Slf4j public abstract class AbstractNotifyEntityTest extends AbstractWebTest { @@ -91,7 +91,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { int cntTime = 1; Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.equals(relation.getTo()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -111,7 +111,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -317,7 +317,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? - EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); + EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType); Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any()); } @@ -353,7 +353,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceTime(EntityId entityId, TenantId tenantId, ActionType actionType, int cntTime) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? - EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); + EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType); ArgumentMatcher matcherEntityId = cntTime == 1 ? argument -> argument.equals(entityId) : argument -> argument.getClass().equals(entityId.getClass()); Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), @@ -364,7 +364,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); } protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) { diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index f7c464fab9..f5e018ed3a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -27,6 +27,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.edge.rpc.EdgeGrpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -231,6 +232,11 @@ public class EdgeImitator { result.add(saveDownlinkMsg(alarmUpdateMsg)); } } + if (downlinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : downlinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(saveDownlinkMsg(alarmCommentUpdateMsg)); + } + } if (downlinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : downlinkMsg.getEntityDataList()) { if (randomFailuresOnTimeseriesDownlink) { diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index 384c00c8ed..9c45977a7c 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -157,6 +158,9 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected AlarmService alarmService; + @MockBean + protected AlarmCommentService alarmCommentService; + @MockBean protected DeviceService deviceService; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java index 7bee9f6983..c822693a71 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; public interface AlarmCommentService { + AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment); AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java index dc969bb362..69a3fc7d8c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Throwables; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -32,6 +33,7 @@ import java.util.concurrent.ThreadLocalRandom; public final class EdgeUtils { private static final EnumMap entityTypeEdgeEventTypeEnumMap; + private static final EnumMap actionTypeEdgeEventActionTypeEnumMap; static { entityTypeEdgeEventTypeEnumMap = new EnumMap<>(EntityType.class); @@ -40,6 +42,13 @@ public final class EdgeUtils { entityTypeEdgeEventTypeEnumMap.put(edgeEventType.getEntityType(), edgeEventType); } } + + actionTypeEdgeEventActionTypeEnumMap = new EnumMap<>(ActionType.class); + for (EdgeEventActionType edgeEventActionType : EdgeEventActionType.values()) { + if (edgeEventActionType.getActionType() != null) { + actionTypeEdgeEventActionTypeEnumMap.put(edgeEventActionType.getActionType(), edgeEventActionType); + } + } } private static final int STACK_TRACE_LIMIT = 10; @@ -54,6 +63,10 @@ public final class EdgeUtils { return entityTypeEdgeEventTypeEnumMap.get(entityType); } + public static EdgeEventActionType getEdgeEventActionTypeByActionType(ActionType actionType) { + return actionTypeEdgeEventActionTypeEnumMap.get(actionType); + } + public static EdgeEvent constructEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index e355901182..e39d93946c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -1,5 +1,5 @@ /** - * Copyright © 2016-2024 The Thingsboard Authors + * Copyright © 2016-2023 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,26 +15,39 @@ */ package org.thingsboard.server.common.data.edge; +import lombok.Getter; +import org.thingsboard.server.common.data.audit.ActionType; + +@Getter public enum EdgeEventActionType { - ADDED, - DELETED, - UPDATED, - POST_ATTRIBUTES, - ATTRIBUTES_UPDATED, - ATTRIBUTES_DELETED, - TIMESERIES_UPDATED, - CREDENTIALS_UPDATED, - ASSIGNED_TO_CUSTOMER, - UNASSIGNED_FROM_CUSTOMER, - RELATION_ADD_OR_UPDATE, - RELATION_DELETED, - RPC_CALL, - ALARM_ACK, - ALARM_CLEAR, - ALARM_ASSIGNED, - ALARM_UNASSIGNED, - ASSIGNED_TO_EDGE, - UNASSIGNED_FROM_EDGE, - CREDENTIALS_REQUEST, - ENTITY_MERGE_REQUEST // deprecated -} \ No newline at end of file + ADDED(ActionType.ADDED), + UPDATED(ActionType.UPDATED), + DELETED(ActionType.DELETED), + POST_ATTRIBUTES(null), + ATTRIBUTES_UPDATED(ActionType.ATTRIBUTES_UPDATED), + ATTRIBUTES_DELETED(ActionType.ATTRIBUTES_DELETED), + TIMESERIES_UPDATED(ActionType.TIMESERIES_UPDATED), + CREDENTIALS_UPDATED(ActionType.CREDENTIALS_UPDATED), + ASSIGNED_TO_CUSTOMER(ActionType.ASSIGNED_TO_CUSTOMER), + UNASSIGNED_FROM_CUSTOMER(ActionType.UNASSIGNED_FROM_CUSTOMER), + RELATION_ADD_OR_UPDATE(ActionType.RELATION_ADD_OR_UPDATE), + RELATION_DELETED(ActionType.RELATION_DELETED), + RPC_CALL(ActionType.RPC_CALL), + ALARM_ACK(ActionType.ALARM_ACK), + ALARM_CLEAR(ActionType.ALARM_CLEAR), + ALARM_ASSIGNED(ActionType.ALARM_ASSIGNED), + ALARM_UNASSIGNED(ActionType.ALARM_UNASSIGNED), + ADDED_COMMENT(ActionType.ADDED_COMMENT), + UPDATED_COMMENT(ActionType.UPDATED_COMMENT), + DELETED_COMMENT(ActionType.DELETED_COMMENT), + ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE), + UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE), + CREDENTIALS_REQUEST(null), + ENTITY_MERGE_REQUEST(null); // deprecated + + private final ActionType actionType; + + EdgeEventActionType(ActionType actionType) { + this.actionType = actionType; + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index a8627d3c0d..6fc255501f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -27,6 +27,7 @@ public enum EdgeEventType { ASSET_PROFILE(true, EntityType.ASSET_PROFILE), ENTITY_VIEW(false, EntityType.ENTITY_VIEW), ALARM(false, EntityType.ALARM), + ALARM_COMMENT(false, null), RULE_CHAIN(false, EntityType.RULE_CHAIN), RULE_CHAIN_METADATA(false, null), EDGE(false, EntityType.EDGE), diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 2f33c21fd8..34a8c6f093 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -325,6 +325,11 @@ message AlarmUpdateMsg { string entity = 18; } +message AlarmCommentUpdateMsg { + UpdateMsgType msgType = 1; + string entity = 2; +} + message CustomerUpdateMsg { UpdateMsgType msgType = 1; int64 idMSB = 2; @@ -618,6 +623,7 @@ message UplinkMsg { repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19; repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20; repeated ResourceUpdateMsg resourceUpdateMsg = 21; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 22; } message UplinkResponseMsg { @@ -661,5 +667,6 @@ message DownlinkMsg { repeated TenantUpdateMsg tenantUpdateMsg = 26; repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27; repeated ResourceUpdateMsg resourceUpdateMsg = 28; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 29; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 3bc8ceb1a0..b60acc6550 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; +import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.service.DataValidator; import java.util.UUID; @@ -61,7 +62,10 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al public AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment) { log.debug("Deleting Alarm Comment: {}", alarmComment); alarmCommentDataValidator.validate(alarmComment, c -> tenantId); - return alarmCommentDao.save(tenantId, alarmComment); + AlarmComment result = alarmCommentDao.save(tenantId, alarmComment); + eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entity(result) + .entityId(result.getAlarmId()).build()); + return result; } @Override @@ -112,5 +116,4 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al } return null; } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index ae9763c59c..2a42544bec 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -135,7 +135,7 @@ public class BaseAlarmService extends AbstractCachedEntityService metadata = msg.getMetaData().getData(); EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msg); @@ -100,7 +100,8 @@ public abstract class AbstractTbMsgPushNode keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {}); + List keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { + }); entityBody.put("keys", keys); entityBody.put(SCOPE, getScope(metadata)); break; @@ -137,6 +138,8 @@ public abstract class AbstractTbMsgPushNode getConfigClazz(); @@ -149,6 +152,11 @@ public abstract class AbstractTbMsgPushNode metadata) { String scope = metadata.get(SCOPE); if (StringUtils.isEmpty(scope)) { @@ -171,6 +179,10 @@ public abstract class AbstractTbMsgPushNodeSupports next originator types:" + - "
DEVICE" + - "
ASSET" + - "
ENTITY_VIEW" + - "
DASHBOARD" + - "
TENANT" + - "
CUSTOMER" + - "
EDGE

" + - "As well node supports next message types:" + + "Supports next message types:" + "
POST_TELEMETRY_REQUEST" + "
POST_ATTRIBUTES_REQUEST" + "
ATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + - "Message will be routed via Failure route if node was not able to save edge event to database or unsupported originator type/message type arrived. " + + "Message will be routed via Failure route if node was not able to save edge event to database or unsupported message type arrived. " + "In case successful storage edge event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodePushToEdgeConfig", @@ -99,6 +93,11 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode pageData; - List> futures = new ArrayList<>(); - do { - pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId edgeId : pageData.getData()) { - EdgeEvent edgeEvent = buildEvent(msg, ctx); - futures.add(notifyEdge(ctx, edgeEvent, edgeId)); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + EntityId originatorId = msg.getOriginator(); + if (DataConstants.COMMENT_CREATED.equals(msg.getType()) || DataConstants.COMMENT_UPDATED.equals(msg.getType())) { + Alarm alarm = JacksonUtil.fromString(msg.getData(), Alarm.class); + if (alarm != null) { + originatorId = alarm.getOriginator(); } - } while (pageData != null && pageData.hasNext()); + } + List> futures = new ArrayList<>(); + EntityId finalOriginatorId = originatorId; + PageDataIterableByTenantIdEntityId edgeIds = new PageDataIterableByTenantIdEntityId<>( + ctx.getEdgeService()::findRelatedEdgeIdsByEntityId, ctx.getTenantId(), finalOriginatorId, DEFAULT_PAGE_SIZE); + for (EdgeId edgeId : edgeIds) { + EdgeEvent edgeEvent = buildEvent(msg, ctx); + futures.add(notifyEdge(ctx, edgeEvent, edgeId)); + } if (futures.isEmpty()) { // ack in case no edges are related to provided entity @@ -176,5 +175,4 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode