diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 3927fe5391..eede4a3afc 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -216,6 +216,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { case ALARM: alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); break; + case ALARM_COMMENT: + alarmProcessor.processAlarmCommentNotification(tenantId, edgeNotificationMsg); + break; case RELATION: relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); break; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index b6a2528f34..712e5147ac 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -80,9 +81,12 @@ public class EdgeEventSourcingListener { return; } log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); - EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + boolean isCreated = Boolean.TRUE.equals(event.getCreated()); + String body = getBodyMsgForEntityEvent(event.getEntity()); + EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity()); + EdgeEventActionType action = getActionForEntityEvent(event.getEntity(), isCreated); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - null, null, action, edgeSynchronizationManager.getEdgeId().get()); + body, type, action, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e); } @@ -96,14 +100,23 @@ public class EdgeEventSourcingListener { return; } log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); + EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity()); + EdgeEventActionType actionType = getEdgeEventActionTypeForEntityEvent(event.getEntity()); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED, + JacksonUtil.toString(event.getEntity()), type, actionType, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e); } } + private EdgeEventActionType getEdgeEventActionTypeForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return EdgeEventActionType.DELETED_COMMENT; + } + return EdgeEventActionType.DELETED; + } + @TransactionalEventListener(fallbackExecution = true) public void handleEvent(ActionEntityEvent event) { if (EntityType.DEVICE.equals(event.getEntityId().getEntityType()) @@ -177,7 +190,7 @@ public class EdgeEventSourcingListener { } break; case TENANT: - return !event.getAdded(); + return !event.getCreated(); case API_USAGE_STATE: case EDGE: return false; @@ -202,4 +215,25 @@ public class EdgeEventSourcingListener { } } } + + private EdgeEventType getEdgeEventTypeForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return EdgeEventType.ALARM_COMMENT; + } + return null; + } + + private String getBodyMsgForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return JacksonUtil.toString(entity); + } + return null; + } + + private EdgeEventActionType getActionForEntityEvent(Object entity, boolean isCreated) { + if (entity instanceof AlarmComment) { + return isCreated ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT; + } + return isCreated ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index d824926155..4ff654e5a6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -531,6 +532,9 @@ public final class EdgeGrpcSession implements Closeable { case RPC_CALL: case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: + case ADDED_COMMENT: + case UPDATED_COMMENT: + case DELETED_COMMENT: downlinkMsg = convertEntityEventToDownlink(edgeEvent); log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg); break; @@ -645,6 +649,8 @@ public final class EdgeGrpcSession implements Closeable { return ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, this.edgeVersion); case ALARM: return ctx.getAlarmProcessor().convertAlarmEventToDownlink(edgeEvent, this.edgeVersion); + case ALARM_COMMENT: + return ctx.getAlarmProcessor().convertAlarmCommentEventToDownlink(edgeEvent, this.edgeVersion); case USER: return ctx.getUserProcessor().convertUserEventToDownlink(edgeEvent, this.edgeVersion); case RELATION: @@ -715,6 +721,12 @@ public final class EdgeGrpcSession implements Closeable { .processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg)); } } + if (uplinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : uplinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(((AlarmProcessor) ctx.getAlarmEdgeProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) + .processAlarmCommentMsgFromEdge(edge.getTenantId(), edge.getId(), alarmCommentUpdateMsg)); + } + } if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) { for (EntityViewUpdateMsg entityViewUpdateMsg : uplinkMsg.getEntityViewUpdateMsgList()) { result.add(((EntityViewProcessor) ctx.getEntityViewProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java index 81606434b5..b5a0f20d6b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java @@ -16,6 +16,8 @@ package org.thingsboard.server.service.edge.rpc.constructor.alarm; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; @@ -23,4 +25,6 @@ import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; public interface AlarmMsgConstructor extends MsgConstructor { AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName); + + AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java index 5abba029de..81a489671f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV1 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV1 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java index d9768b8f9e..143a41b73c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV2 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV2 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java new file mode 100644 index 0000000000..c9af86e8cc --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor.alarm; + +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; + +public abstract class BaseAlarmMsgConstructor implements AlarmMsgConstructor { + + @Override + public AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment) { + return AlarmCommentUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(alarmComment)).build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index cb1d6fb4c8..b74edbae6c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; @@ -60,6 +61,7 @@ import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -146,6 +148,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected AlarmService alarmService; + @Autowired + protected AlarmCommentService alarmCommentService; + @Autowired protected DeviceService deviceService; @@ -351,10 +356,13 @@ public abstract class BaseEdgeProcessor { case ALARM_ASSIGNED: case ALARM_UNASSIGNED: case CREDENTIALS_REQUEST: + case ADDED_COMMENT: + case UPDATED_COMMENT: return true; } switch (type) { case ALARM: + case ALARM_COMMENT: case RULE_CHAIN: case RULE_CHAIN_METADATA: case USER: @@ -439,14 +447,17 @@ public abstract class BaseEdgeProcessor { case CREDENTIALS_UPDATED: case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: + case UPDATED_COMMENT: return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE; case ADDED: case ASSIGNED_TO_EDGE: case RELATION_ADD_OR_UPDATE: + case ADDED_COMMENT: return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE; case DELETED: case UNASSIGNED_FROM_EDGE: case RELATION_DELETED: + case DELETED_COMMENT: return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; case ALARM_ACK: return UpdateMsgType.ALARM_ACK_RPC_MESSAGE; @@ -515,22 +526,14 @@ public abstract class BaseEdgeProcessor { private ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); } - } while (pageData != null && pageData.hasNext()); + } return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } @@ -713,19 +716,13 @@ public abstract class BaseEdgeProcessor { } private boolean isEntityNotAssignedToEdge(TenantId tenantId, EntityId entityId, EdgeId edgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - if (pageData.getData().contains(edgeId)) { - return false; - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId edgeId1 : edgeIds) { + if (edgeId1.equals(edgeId)) { + return false; } - } while (pageData != null && pageData.hasNext()); + } return true; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index ea48032b69..f83cbbba9c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -29,12 +30,14 @@ import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructor; import java.util.ArrayList; import java.util.List; @@ -67,58 +70,90 @@ public abstract class AlarmEdgeProcessor extends BaseAlarmProcessor implements A return null; } + @Override + public ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsgFromEdge [{}]", tenantId, alarmCommentUpdateMsg); + try { + edgeSynchronizationManager.getEdgeId().set(edgeId); + return processAlarmCommentMsg(tenantId, alarmCommentUpdateMsg); + } finally { + edgeSynchronizationManager.getEdgeId().remove(); + } + } + + @Override + public DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) { + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + switch (edgeEvent.getAction()) { + case ADDED_COMMENT: + case UPDATED_COMMENT: + case DELETED_COMMENT: + AlarmComment alarmComment = JacksonUtil.convertValue(edgeEvent.getBody(), AlarmComment.class); + if (alarmComment != null) { + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory + .getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment)) + .build(); + } + default: + return null; + } + } + public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); - switch (actionType) { - case DELETED: - Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); - if (deletedAlarm == null) { - return Futures.immediateFuture(null); - } - List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), - alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId); - return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); - default: - ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); - return Futures.transformAsync(alarmFuture, alarm -> { - if (alarm == null) { - return Futures.immediateFuture(null); - } - EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); - if (type == null) { - return Futures.immediateFuture(null); - } - List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), - alarmId, actionType, null, originatorEdgeId); - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); - }, dbCallbackExecutorService); + if (EdgeEventActionType.DELETED.equals(actionType)) { + Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); + if (deletedAlarm == null) { + return Futures.immediateFuture(null); + } + List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); } + ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); + return Futures.transformAsync(alarmFuture, alarm -> { + if (alarm == null) { + return Futures.immediateFuture(null); + } + EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); + if (type == null) { + return Futures.immediateFuture(null); + } + List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), + alarmId, actionType, null, originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + }, dbCallbackExecutorService); } - private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; + public ListenableFuture processAlarmCommentNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); + AlarmComment alarmComment = JacksonUtil.fromString(edgeNotificationMsg.getBody(), AlarmComment.class); + if (alarmComment == null) { + return Futures.immediateFuture(null); + } + Alarm alarmById = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId())); + List> delFutures = pushEventToAllRelatedEdges(tenantId, alarmById.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(alarmComment), originatorEdgeId, EdgeEventType.ALARM_COMMENT); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); + } + + private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, + EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId, + EdgeEventType edgeEventType) { List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, - relatedEdgeId, - EdgeEventType.ALARM, - actionType, - alarmId, - body)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, originatorId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, edgeEventType, actionType, alarmId, body)); } - } while (pageData != null && pageData.hasNext()); + } return futures; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java index 68a72bf01e..938269462d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; @@ -29,4 +30,8 @@ public interface AlarmProcessor extends EdgeProcessor { ListenableFuture processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg); DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); + + ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg); + + DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 546d6e63db..11cd2ab56c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -19,10 +19,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCreateOrUpdateActiveRequest; import org.thingsboard.server.common.data.alarm.AlarmUpdateRequest; import org.thingsboard.server.common.data.asset.Asset; @@ -33,6 +35,8 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.alarm.AlarmCommentDao; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; @@ -44,6 +48,9 @@ import java.util.UUID; @Slf4j public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { + @Autowired + protected AlarmCommentDao alarmCommentDao; + public ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg); AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB())); @@ -93,6 +100,42 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { return Futures.immediateFuture(null); } + public ListenableFuture processAlarmCommentMsg(TenantId tenantId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsg [{}]", tenantId, alarmCommentUpdateMsg); + AlarmComment alarmComment = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + if (alarmComment == null) { + throw new RuntimeException("[{" + tenantId + "}] alarmCommentUpdateMsg {" + alarmCommentUpdateMsg + "} cannot be converted to alarm comment"); + } + try { + Alarm alarm = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId())); + if (alarm == null) { + return Futures.immediateFuture(null); + } + switch (alarmCommentUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + alarmCommentDao.createAlarmComment(tenantId, alarmComment); + break; + case ENTITY_UPDATED_RPC_MESSAGE: + alarmCommentService.createOrUpdateAlarmComment(tenantId, alarmComment); + break; + case ENTITY_DELETED_RPC_MESSAGE: + AlarmComment alarmCommentToDelete = alarmCommentService.findAlarmCommentById(tenantId, alarmComment.getId()); + if (alarmCommentToDelete != null) { + alarmCommentService.saveAlarmComment(tenantId, alarmCommentToDelete); + } + break; + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(alarmCommentUpdateMsg.getMsgType()); + } + } catch (Exception e) { + log.error("[{}] Failed to process alarm comment update msg [{}]", tenantId, alarmCommentUpdateMsg, e); + return Futures.immediateFailedFuture(e); + } + return Futures.immediateFuture(null); + } + + protected abstract EntityId getAlarmOriginatorFromMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg); protected abstract Alarm constructAlarmFromUpdateMsg(TenantId tenantId, AlarmId alarmId, EntityId originatorId, AlarmUpdateMsg alarmUpdateMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index c476f4e4e6..c699cd6c02 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -73,7 +73,7 @@ public class EntityStateSourcingListener { TenantId tenantId = event.getTenantId(); EntityId entityId = event.getEntityId(); EntityType entityType = entityId.getEntityType(); - boolean isCreated = event.getAdded() != null && event.getAdded(); + boolean isCreated = event.getCreated() != null && event.getCreated(); ComponentLifecycleEvent lifecycleEvent = isCreated ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED; switch (entityType) { diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java index c0274792cb..da7c69050e 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.exception.ThingsboardException; public interface TbAlarmCommentService { + AlarmComment saveAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException; void deleteAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java index 66e68185d0..17d52329b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java @@ -61,7 +61,7 @@ public class DefaultTbTenantService extends AbstractTbEntityService implements T tenantProfileCache.evict(savedTenant.getId()); if (created) { - eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(savedTenant.getId()).entity(savedTenant).added(true).build()); + eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(savedTenant.getId()).entity(savedTenant).created(true).build()); } TenantProfile oldTenantProfile = oldTenant != null ? tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, oldTenant.getTenantProfileId()) : null; diff --git a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java index e020aeef76..7b52c70cf7 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java +++ b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java @@ -182,7 +182,7 @@ public abstract class AbstractOAuth2ClientMapper { installScripts.createDefaultEdgeRuleChains(tenant.getId()); tenantProfileCache.evict(tenant.getId()); - eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(tenant.getId()).entity(tenant).added(true).build()); + eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(tenant.getId()).entity(tenant).created(true).build()); } else { tenant = tenants.get(0); } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java index 9cbba67b1c..5ef7e1db92 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java @@ -154,24 +154,6 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { Mockito.reset(tbClusterService, auditLogService); } - protected void testNotifyManyEntityManyTimeMsgToEdgeServiceNever(HasName entity, HasName originator, - TenantId tenantId, CustomerId customerId, UserId userId, String userName, - ActionType actionType, int cntTime, Object... additionalInfo) { - EntityId entityId = createEntityId_NULL_UUID(entity); - EntityId originatorId = createEntityId_NULL_UUID(originator); - testNotificationMsgToEdgeServiceNeverWithActionType(entityId, actionType); - ArgumentMatcher matcherEntityClassEquals = argument -> argument.getClass().equals(entity.getClass()); - ArgumentMatcher matcherOriginatorId = argument -> argument.getClass().equals(originatorId.getClass()); - ArgumentMatcher matcherCustomerId = customerId == null ? - argument -> argument.getClass().equals(CustomerId.class) : argument -> argument.equals(customerId); - ArgumentMatcher matcherUserId = userId == null ? - argument -> argument.getClass().equals(UserId.class) : argument -> argument.equals(userId); - testLogEntityActionAdditionalInfo(matcherEntityClassEquals, matcherOriginatorId, tenantId, matcherCustomerId, matcherUserId, userName, actionType, cntTime, - extractMatcherAdditionalInfo(additionalInfo)); - testPushMsgToRuleEngineTime(matcherOriginatorId, tenantId, entity, cntTime); - Mockito.reset(tbClusterService, auditLogService); - } - protected void testNotifyManyEntityManyTimeMsgToEdgeServiceEntityEqAny(HasName entity, HasName originator, TenantId tenantId, CustomerId customerId, UserId userId, String userName, ActionType actionType, @@ -318,8 +300,8 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType); - Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), - Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any()); + Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(), + Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any()); } private void testNotificationMsgToEdgeServiceNever(EntityId entityId) { diff --git a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java index f5c1fbdae9..1415e38d7e 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java @@ -110,7 +110,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Alarm alarm = createAlarm(TEST_ALARM_TYPE); - testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(), tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, ActionType.ADDED); } @@ -122,7 +122,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Alarm alarm = createAlarm(TEST_ALARM_TYPE); - testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(), tenantId, customerId, tenantAdminUserId, TENANT_ADMIN_EMAIL, ActionType.ADDED); } @@ -772,7 +772,7 @@ public class AlarmControllerTest extends AbstractControllerTest { alarm = doPost("/api/alarm", alarm, Alarm.class); Assert.assertNotNull("Saved alarm is null!", alarm); - testNotifyEntityNeverMsgToEdgeServiceOneTime(alarm, alarm.getId(), tenantId, ActionType.ADDED); + testNotifyEntityNever(alarm.getId(), alarm); resetTokens(); diff --git a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java index 5ada0e69d5..239cb87ab4 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java @@ -15,21 +15,28 @@ */ package org.thingsboard.server.edge; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.TextNode; import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; +import org.thingsboard.server.common.data.alarm.AlarmCommentType; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.alarm.AlarmStatus; +import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; @@ -84,26 +91,18 @@ public class AlarmEdgeTest extends AbstractEdgeTest { alarm.setOriginator(device.getId()); alarm.setType("alarm"); alarm.setSeverity(AlarmSeverity.CRITICAL); - edgeImitator.expectMessageAmount(1); Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); - Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; - Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); - Assert.assertNotNull(alarmMsg); - Assert.assertEquals(savedAlarm, alarmMsg); - Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); + edgeImitator.ignoreType(AlarmCommentUpdateMsg.class); // ack alarm edgeImitator.expectMessageAmount(1); doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack"); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; + AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; Assert.assertEquals(UpdateMsgType.ALARM_ACK_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); - alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); + Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); Assert.assertNotNull(alarmMsg); Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); @@ -137,6 +136,99 @@ public class AlarmEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus()); + edgeImitator.allowIgnoredTypes(); + } + + @Test + public void testSendAlarmCommentToCloud() throws Exception { + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + Alarm alarm = buildAlarmForUplinkMsg(device.getId()); + + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + AlarmUpdateMsg.Builder alarmUpdateMgBuilder = AlarmUpdateMsg.newBuilder(); + alarmUpdateMgBuilder.setIdMSB(alarm.getUuidId().getMostSignificantBits()); + alarmUpdateMgBuilder.setIdLSB(alarm.getUuidId().getLeastSignificantBits()); + alarmUpdateMgBuilder.setEntity(JacksonUtil.toString(alarm)); + testAutoGeneratedCodeByProtobuf(alarmUpdateMgBuilder); + uplinkMsgBuilder.addAlarmUpdateMsg(alarmUpdateMgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + AlarmComment alarmComment = buildAlarmCommentForUplinkMsg(alarm.getId()); + + uplinkMsgBuilder = UplinkMsg.newBuilder(); + AlarmCommentUpdateMsg.Builder alarmCommentUpdateMgBuilder = AlarmCommentUpdateMsg.newBuilder(); + alarmCommentUpdateMgBuilder.setEntity(JacksonUtil.toString(alarmComment)); + alarmCommentUpdateMgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + testAutoGeneratedCodeByProtobuf(alarmCommentUpdateMgBuilder); + uplinkMsgBuilder.addAlarmCommentUpdateMsg(alarmCommentUpdateMgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + PageData pageData = doGetTyped("/api/alarm/" + alarmComment.getAlarmId().getId() + "/comment" + "?page=0&pageSize=1", new TypeReference<>() {}); + Assert.assertNotNull("Found pageData is null", pageData); + Assert.assertNotEquals("Expected alarms are not found!", 0, pageData.getTotalElements()); + + Assert.assertNotNull(pageData.getData().get(0)); + AlarmCommentInfo alarmInfo = pageData.getData().get(0); + Assert.assertEquals(alarm.getId(), alarmInfo.getAlarmId()); + Assert.assertEquals(alarmComment.getAlarmId(), alarmInfo.getAlarmId()); + } + + @Test + public void testAlarmComments() throws Exception { + Device device = findDeviceByName("Edge Device 1"); + Alarm alarm = new Alarm(); + alarm.setOriginator(device.getId()); + alarm.setType("alarm"); + alarm.setSeverity(AlarmSeverity.MINOR); + Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); + + // create alarm comment + edgeImitator.expectMessageAmount(1); + AlarmComment alarmComment = new AlarmComment(); + alarmComment.setComment(new TextNode("Test")); + alarmComment.setAlarmId(savedAlarm.getId()); + alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + AlarmCommentUpdateMsg alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + AlarmComment alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); + Assert.assertEquals(alarmComment, alarmCommentMsg); + + // update alarm comment + edgeImitator.expectMessageAmount(1); + alarmComment.setComment(JacksonUtil.newObjectNode().set("text", new TextNode("Updated comment"))); + alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); + Assert.assertEquals(alarmComment, alarmCommentMsg); + + // delete alarm + edgeImitator.expectMessageAmount(1); + doDelete("/api/alarm/" + savedAlarm.getUuidId() + "/comment/" + alarmComment.getUuidId()) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); } private Alarm buildAlarmForUplinkMsg(DeviceId deviceId) { @@ -148,4 +240,16 @@ public class AlarmEdgeTest extends AbstractEdgeTest { alarm.setSeverity(AlarmSeverity.CRITICAL); return alarm; } + + private AlarmComment buildAlarmCommentForUplinkMsg(AlarmId alarmId) { + UUID uuid = Uuids.timeBased(); + AlarmComment alarmComment = new AlarmComment(); + alarmComment.setAlarmId(alarmId); + alarmComment.setType(AlarmCommentType.OTHER); + alarmComment.setUserId(tenantAdminUserId); + alarmComment.setId(new AlarmCommentId(uuid)); + alarmComment.setComment(new TextNode("AlarmComment")); + alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid)); + return alarmComment; + } } diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index f7c464fab9..f5e018ed3a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -27,6 +27,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.edge.rpc.EdgeGrpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -231,6 +232,11 @@ public class EdgeImitator { result.add(saveDownlinkMsg(alarmUpdateMsg)); } } + if (downlinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : downlinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(saveDownlinkMsg(alarmCommentUpdateMsg)); + } + } if (downlinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : downlinkMsg.getEntityDataList()) { if (randomFailuresOnTimeseriesDownlink) { diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index f494f7c397..5b637c9100 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -156,6 +157,9 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected AlarmService alarmService; + @MockBean + protected AlarmCommentService alarmCommentService; + @MockBean protected DeviceService deviceService; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java index 7bee9f6983..c822693a71 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; public interface AlarmCommentService { + AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment); AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index 042de888f7..680f744c7b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -37,6 +37,9 @@ public enum EdgeEventActionType { ALARM_CLEAR(ActionType.ALARM_CLEAR), ALARM_ASSIGNED(ActionType.ALARM_ASSIGNED), ALARM_UNASSIGNED(ActionType.ALARM_UNASSIGNED), + ADDED_COMMENT(ActionType.ADDED_COMMENT), + UPDATED_COMMENT(ActionType.UPDATED_COMMENT), + DELETED_COMMENT(ActionType.DELETED_COMMENT), ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE), UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE), CREDENTIALS_REQUEST(null), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index a8627d3c0d..6fc255501f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -27,6 +27,7 @@ public enum EdgeEventType { ASSET_PROFILE(true, EntityType.ASSET_PROFILE), ENTITY_VIEW(false, EntityType.ENTITY_VIEW), ALARM(false, EntityType.ALARM), + ALARM_COMMENT(false, null), RULE_CHAIN(false, EntityType.RULE_CHAIN), RULE_CHAIN_METADATA(false, null), EDGE(false, EntityType.EDGE), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java b/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java index 19c3fc8f4f..8d46a46781 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.page; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -64,8 +65,8 @@ public abstract class BasePageDataIterable implements Iterable, Iterator pageData = fetchPageData(link); currentIdx = 0; - currentItems = pageData.getData(); - hasNextPack = pageData.hasNext(); + currentItems = pageData != null ? pageData.getData() : new ArrayList<>(); + hasNextPack = pageData != null && pageData.hasNext(); nextPackLink = link.nextPageLink(); } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 2f33c21fd8..34a8c6f093 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -325,6 +325,11 @@ message AlarmUpdateMsg { string entity = 18; } +message AlarmCommentUpdateMsg { + UpdateMsgType msgType = 1; + string entity = 2; +} + message CustomerUpdateMsg { UpdateMsgType msgType = 1; int64 idMSB = 2; @@ -618,6 +623,7 @@ message UplinkMsg { repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19; repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20; repeated ResourceUpdateMsg resourceUpdateMsg = 21; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 22; } message UplinkResponseMsg { @@ -661,5 +667,6 @@ message DownlinkMsg { repeated TenantUpdateMsg tenantUpdateMsg = 26; repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27; repeated ResourceUpdateMsg resourceUpdateMsg = 28; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 29; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 3bc8ceb1a0..5bdd9803b2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -31,6 +31,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; +import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; +import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.service.DataValidator; import java.util.UUID; @@ -50,18 +52,28 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al @Override public AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment) { alarmCommentDataValidator.validate(alarmComment, c -> tenantId); - if (alarmComment.getId() == null) { - return createAlarmComment(tenantId, alarmComment); + boolean isCreated = alarmComment.getId() == null; + AlarmComment result; + if (isCreated) { + result = createAlarmComment(tenantId, alarmComment); } else { - return updateAlarmComment(tenantId, alarmComment); + result = updateAlarmComment(tenantId, alarmComment); } + if (result != null) { + eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entity(result) + .entityId(result.getAlarmId()).created(isCreated).build()); + } + return result; } @Override public AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment) { log.debug("Deleting Alarm Comment: {}", alarmComment); alarmCommentDataValidator.validate(alarmComment, c -> tenantId); - return alarmCommentDao.save(tenantId, alarmComment); + AlarmComment result = alarmCommentDao.save(tenantId, alarmComment); + eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entity(result) + .entityId(result.getAlarmId()).build()); + return result; } @Override @@ -112,5 +124,4 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al } return null; } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index ae9763c59c..72e1b9c7bf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -135,7 +135,7 @@ public class BaseAlarmService extends AbstractCachedEntityService { private final T entity; private final T oldEntity; private final EntityId entityId; - private final Boolean added; + private final Boolean created; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotificationRuleService.java b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotificationRuleService.java index 9daa3ce4c2..7dbecbb725 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotificationRuleService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotificationRuleService.java @@ -53,7 +53,7 @@ public class DefaultNotificationRuleService extends AbstractEntityService implem try { NotificationRule savedRule = notificationRuleDao.saveAndFlush(tenantId, notificationRule); eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entityId(savedRule.getId()) - .added(notificationRule.getId() == null).build()); + .created(notificationRule.getId() == null).build()); return savedRule; } catch (Exception e) { checkConstraintViolation(e, Map.of( diff --git a/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java b/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java index 24539828c9..53a585c358 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java @@ -84,7 +84,7 @@ public class BaseOtaPackageService extends AbstractCachedEntityService keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {}); + List keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { + }); entityBody.put("keys", keys); entityBody.put(SCOPE, getScope(metadata)); break; @@ -138,9 +140,8 @@ public abstract class AbstractTbMsgPushNode metadata) { @@ -174,7 +175,7 @@ public abstract class AbstractTbMsgPushNodeSupports next originator types:" + - "
DEVICE" + - "
ASSET" + - "
ENTITY_VIEW" + - "
DASHBOARD" + - "
TENANT" + - "
CUSTOMER" + - "
EDGE

" + - "As well node supports next message types:" + + "Supports next message types:" + "
POST_TELEMETRY_REQUEST" + "
POST_ATTRIBUTES_REQUEST" + "
ATTRIBUTES_UPDATED" + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index 6bc3846698..e70c64134d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -31,8 +31,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.TbMsg; @@ -52,21 +51,13 @@ import java.util.UUID; "This node used only on cloud instances to push messages from cloud to edge. " + "Once message arrived into this node it’s going to be converted into edge event and saved to the database. " + "Node doesn't push messages directly to edge, but stores event(s) in the edge queue. " + - "
Supports next originator types:" + - "
DEVICE" + - "
ASSET" + - "
ENTITY_VIEW" + - "
DASHBOARD" + - "
TENANT" + - "
CUSTOMER" + - "
EDGE

" + - "As well node supports next message types:" + + "Supports next message types:" + "
POST_TELEMETRY_REQUEST" + "
POST_ATTRIBUTES_REQUEST" + "
ATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + - "Message will be routed via Failure route if node was not able to save edge event to database or unsupported originator type/message type arrived. " + + "Message will be routed via Failure route if node was not able to save edge event to database or unsupported message type arrived. " + "In case successful storage edge event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodePushToEdgeConfig", @@ -129,21 +120,13 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode pageData; List> futures = new ArrayList<>(); - do { - pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId edgeId : pageData.getData()) { - EdgeEvent edgeEvent = buildEvent(msg, ctx); - futures.add(notifyEdge(ctx, edgeEvent, edgeId)); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); + PageDataIterableByTenantIdEntityId edgeIds = new PageDataIterableByTenantIdEntityId<>( + ctx.getEdgeService()::findRelatedEdgeIdsByEntityId, ctx.getTenantId(), msg.getOriginator(), DEFAULT_PAGE_SIZE); + for (EdgeId edgeId : edgeIds) { + EdgeEvent edgeEvent = buildEvent(msg, ctx); + futures.add(notifyEdge(ctx, edgeEvent, edgeId)); + } if (futures.isEmpty()) { // ack in case no edges are related to provided entity @@ -172,5 +155,4 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode