Merge remote-tracking branch 'upstream/master' into develop/3.7
This commit is contained in:
		
						commit
						b3f5d9885b
					
				@ -216,6 +216,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
 | 
			
		||||
                        case ALARM:
 | 
			
		||||
                            alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                            break;
 | 
			
		||||
                        case ALARM_COMMENT:
 | 
			
		||||
                            alarmProcessor.processAlarmCommentNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                            break;
 | 
			
		||||
                        case RELATION:
 | 
			
		||||
                            relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg);
 | 
			
		||||
                            break;
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.OtaPackageInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.User;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmApiCallResult;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.audit.ActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
 | 
			
		||||
@ -80,9 +81,12 @@ public class EdgeEventSourcingListener {
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
 | 
			
		||||
            EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
 | 
			
		||||
            boolean isCreated = Boolean.TRUE.equals(event.getCreated());
 | 
			
		||||
            String body = getBodyMsgForEntityEvent(event.getEntity());
 | 
			
		||||
            EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity());
 | 
			
		||||
            EdgeEventActionType action = getActionForEntityEvent(event.getEntity(), isCreated);
 | 
			
		||||
            tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
 | 
			
		||||
                    null, null, action, edgeSynchronizationManager.getEdgeId().get());
 | 
			
		||||
                    body, type, action, edgeSynchronizationManager.getEdgeId().get());
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e);
 | 
			
		||||
        }
 | 
			
		||||
@ -96,14 +100,23 @@ public class EdgeEventSourcingListener {
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
            log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
 | 
			
		||||
            EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity());
 | 
			
		||||
            EdgeEventActionType actionType = getEdgeEventActionTypeForEntityEvent(event.getEntity());
 | 
			
		||||
            tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
 | 
			
		||||
                    JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED,
 | 
			
		||||
                    JacksonUtil.toString(event.getEntity()), type, actionType,
 | 
			
		||||
                    edgeSynchronizationManager.getEdgeId().get());
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeEventActionType getEdgeEventActionTypeForEntityEvent(Object entity) {
 | 
			
		||||
        if (entity instanceof AlarmComment) {
 | 
			
		||||
            return EdgeEventActionType.DELETED_COMMENT;
 | 
			
		||||
        }
 | 
			
		||||
        return EdgeEventActionType.DELETED;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @TransactionalEventListener(fallbackExecution = true)
 | 
			
		||||
    public void handleEvent(ActionEntityEvent<?> event) {
 | 
			
		||||
        if (EntityType.DEVICE.equals(event.getEntityId().getEntityType())
 | 
			
		||||
@ -177,7 +190,7 @@ public class EdgeEventSourcingListener {
 | 
			
		||||
                }
 | 
			
		||||
                break;
 | 
			
		||||
            case TENANT:
 | 
			
		||||
                return !event.getAdded();
 | 
			
		||||
                return !event.getCreated();
 | 
			
		||||
            case API_USAGE_STATE:
 | 
			
		||||
            case EDGE:
 | 
			
		||||
                return false;
 | 
			
		||||
@ -202,4 +215,25 @@ public class EdgeEventSourcingListener {
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeEventType getEdgeEventTypeForEntityEvent(Object entity) {
 | 
			
		||||
        if (entity instanceof AlarmComment) {
 | 
			
		||||
            return EdgeEventType.ALARM_COMMENT;
 | 
			
		||||
        }
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String getBodyMsgForEntityEvent(Object entity) {
 | 
			
		||||
        if (entity instanceof AlarmComment) {
 | 
			
		||||
            return JacksonUtil.toString(entity);
 | 
			
		||||
        }
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private EdgeEventActionType getActionForEntityEvent(Object entity, boolean isCreated) {
 | 
			
		||||
        if (entity instanceof AlarmComment) {
 | 
			
		||||
            return isCreated ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT;
 | 
			
		||||
        }
 | 
			
		||||
        return isCreated ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.page.SortOrder;
 | 
			
		||||
import org.thingsboard.server.common.data.page.TimePageLink;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg;
 | 
			
		||||
@ -531,6 +532,9 @@ public final class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                    case RPC_CALL:
 | 
			
		||||
                    case ASSIGNED_TO_CUSTOMER:
 | 
			
		||||
                    case UNASSIGNED_FROM_CUSTOMER:
 | 
			
		||||
                    case ADDED_COMMENT:
 | 
			
		||||
                    case UPDATED_COMMENT:
 | 
			
		||||
                    case DELETED_COMMENT:
 | 
			
		||||
                        downlinkMsg = convertEntityEventToDownlink(edgeEvent);
 | 
			
		||||
                        log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg);
 | 
			
		||||
                        break;
 | 
			
		||||
@ -645,6 +649,8 @@ public final class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                return ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, this.edgeVersion);
 | 
			
		||||
            case ALARM:
 | 
			
		||||
                return ctx.getAlarmProcessor().convertAlarmEventToDownlink(edgeEvent, this.edgeVersion);
 | 
			
		||||
            case ALARM_COMMENT:
 | 
			
		||||
                return ctx.getAlarmProcessor().convertAlarmCommentEventToDownlink(edgeEvent, this.edgeVersion);
 | 
			
		||||
            case USER:
 | 
			
		||||
                return ctx.getUserProcessor().convertUserEventToDownlink(edgeEvent, this.edgeVersion);
 | 
			
		||||
            case RELATION:
 | 
			
		||||
@ -715,6 +721,12 @@ public final class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                            .processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (uplinkMsg.getAlarmCommentUpdateMsgCount() > 0) {
 | 
			
		||||
                for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : uplinkMsg.getAlarmCommentUpdateMsgList()) {
 | 
			
		||||
                    result.add(((AlarmProcessor) ctx.getAlarmEdgeProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion))
 | 
			
		||||
                            .processAlarmCommentMsgFromEdge(edge.getTenantId(), edge.getId(), alarmCommentUpdateMsg));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) {
 | 
			
		||||
                for (EntityViewUpdateMsg entityViewUpdateMsg : uplinkMsg.getEntityViewUpdateMsgList()) {
 | 
			
		||||
                    result.add(((EntityViewProcessor) ctx.getEntityViewProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion))
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,8 @@
 | 
			
		||||
package org.thingsboard.server.service.edge.rpc.constructor.alarm;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
 | 
			
		||||
import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor;
 | 
			
		||||
@ -23,4 +25,6 @@ import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor;
 | 
			
		||||
public interface AlarmMsgConstructor extends MsgConstructor {
 | 
			
		||||
 | 
			
		||||
    AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName);
 | 
			
		||||
 | 
			
		||||
    AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
public class AlarmMsgConstructorV1 implements AlarmMsgConstructor {
 | 
			
		||||
public class AlarmMsgConstructorV1 extends BaseAlarmMsgConstructor {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) {
 | 
			
		||||
 | 
			
		||||
@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@TbCoreComponent
 | 
			
		||||
public class AlarmMsgConstructorV2 implements AlarmMsgConstructor {
 | 
			
		||||
public class AlarmMsgConstructorV2 extends BaseAlarmMsgConstructor {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,29 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2024 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.edge.rpc.constructor.alarm;
 | 
			
		||||
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
 | 
			
		||||
 | 
			
		||||
public abstract class BaseAlarmMsgConstructor implements AlarmMsgConstructor {
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment) {
 | 
			
		||||
        return AlarmCommentUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(alarmComment)).build();
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -52,6 +52,7 @@ import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
 | 
			
		||||
import org.thingsboard.server.common.data.msg.TbMsgType;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.EntityRelation;
 | 
			
		||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
 | 
			
		||||
@ -60,6 +61,7 @@ import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgDataType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmCommentService;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmService;
 | 
			
		||||
import org.thingsboard.server.dao.asset.AssetProfileService;
 | 
			
		||||
import org.thingsboard.server.dao.asset.AssetService;
 | 
			
		||||
@ -146,6 +148,9 @@ public abstract class BaseEdgeProcessor {
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected AlarmService alarmService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected AlarmCommentService alarmCommentService;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected DeviceService deviceService;
 | 
			
		||||
 | 
			
		||||
@ -351,10 +356,13 @@ public abstract class BaseEdgeProcessor {
 | 
			
		||||
            case ALARM_ASSIGNED:
 | 
			
		||||
            case ALARM_UNASSIGNED:
 | 
			
		||||
            case CREDENTIALS_REQUEST:
 | 
			
		||||
            case ADDED_COMMENT:
 | 
			
		||||
            case UPDATED_COMMENT:
 | 
			
		||||
                return true;
 | 
			
		||||
        }
 | 
			
		||||
        switch (type) {
 | 
			
		||||
            case ALARM:
 | 
			
		||||
            case ALARM_COMMENT:
 | 
			
		||||
            case RULE_CHAIN:
 | 
			
		||||
            case RULE_CHAIN_METADATA:
 | 
			
		||||
            case USER:
 | 
			
		||||
@ -439,14 +447,17 @@ public abstract class BaseEdgeProcessor {
 | 
			
		||||
            case CREDENTIALS_UPDATED:
 | 
			
		||||
            case ASSIGNED_TO_CUSTOMER:
 | 
			
		||||
            case UNASSIGNED_FROM_CUSTOMER:
 | 
			
		||||
            case UPDATED_COMMENT:
 | 
			
		||||
                return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
 | 
			
		||||
            case ADDED:
 | 
			
		||||
            case ASSIGNED_TO_EDGE:
 | 
			
		||||
            case RELATION_ADD_OR_UPDATE:
 | 
			
		||||
            case ADDED_COMMENT:
 | 
			
		||||
                return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
 | 
			
		||||
            case DELETED:
 | 
			
		||||
            case UNASSIGNED_FROM_EDGE:
 | 
			
		||||
            case RELATION_DELETED:
 | 
			
		||||
            case DELETED_COMMENT:
 | 
			
		||||
                return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
 | 
			
		||||
            case ALARM_ACK:
 | 
			
		||||
                return UpdateMsgType.ALARM_ACK_RPC_MESSAGE;
 | 
			
		||||
@ -515,22 +526,14 @@ public abstract class BaseEdgeProcessor {
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<Void> processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type,
 | 
			
		||||
                                                                     EdgeEventActionType actionType, EdgeId sourceEdgeId) {
 | 
			
		||||
        PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
 | 
			
		||||
        PageData<EdgeId> pageData;
 | 
			
		||||
        List<ListenableFuture<Void>> futures = new ArrayList<>();
 | 
			
		||||
        do {
 | 
			
		||||
            pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink);
 | 
			
		||||
            if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
 | 
			
		||||
                for (EdgeId relatedEdgeId : pageData.getData()) {
 | 
			
		||||
                    if (!relatedEdgeId.equals(sourceEdgeId)) {
 | 
			
		||||
                        futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                if (pageData.hasNext()) {
 | 
			
		||||
                    pageLink = pageLink.nextPageLink();
 | 
			
		||||
                }
 | 
			
		||||
        PageDataIterableByTenantIdEntityId<EdgeId> edgeIds =
 | 
			
		||||
                new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE);
 | 
			
		||||
        for (EdgeId relatedEdgeId : edgeIds) {
 | 
			
		||||
            if (!relatedEdgeId.equals(sourceEdgeId)) {
 | 
			
		||||
                futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null));
 | 
			
		||||
            }
 | 
			
		||||
        } while (pageData != null && pageData.hasNext());
 | 
			
		||||
        }
 | 
			
		||||
        return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -713,19 +716,13 @@ public abstract class BaseEdgeProcessor {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private boolean isEntityNotAssignedToEdge(TenantId tenantId, EntityId entityId, EdgeId edgeId) {
 | 
			
		||||
        PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
 | 
			
		||||
        PageData<EdgeId> pageData;
 | 
			
		||||
        do {
 | 
			
		||||
            pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink);
 | 
			
		||||
            if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
 | 
			
		||||
                if (pageData.getData().contains(edgeId)) {
 | 
			
		||||
                    return false;
 | 
			
		||||
                }
 | 
			
		||||
                if (pageData.hasNext()) {
 | 
			
		||||
                    pageLink = pageLink.nextPageLink();
 | 
			
		||||
                }
 | 
			
		||||
        PageDataIterableByTenantIdEntityId<EdgeId> edgeIds =
 | 
			
		||||
                new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE);
 | 
			
		||||
        for (EdgeId edgeId1 : edgeIds) {
 | 
			
		||||
            if (edgeId1.equals(edgeId)) {
 | 
			
		||||
                return false;
 | 
			
		||||
            }
 | 
			
		||||
        } while (pageData != null && pageData.hasNext());
 | 
			
		||||
        }
 | 
			
		||||
        return true;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.EdgeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
 | 
			
		||||
@ -29,12 +30,14 @@ import org.thingsboard.server.common.data.id.AlarmId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructor;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
@ -67,58 +70,90 @@ public abstract class AlarmEdgeProcessor extends BaseAlarmProcessor implements A
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Void> processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) {
 | 
			
		||||
        log.trace("[{}] processAlarmCommentMsgFromEdge [{}]", tenantId, alarmCommentUpdateMsg);
 | 
			
		||||
        try {
 | 
			
		||||
            edgeSynchronizationManager.getEdgeId().set(edgeId);
 | 
			
		||||
            return processAlarmCommentMsg(tenantId, alarmCommentUpdateMsg);
 | 
			
		||||
        } finally {
 | 
			
		||||
            edgeSynchronizationManager.getEdgeId().remove();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) {
 | 
			
		||||
        UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
 | 
			
		||||
        switch (edgeEvent.getAction()) {
 | 
			
		||||
            case ADDED_COMMENT:
 | 
			
		||||
            case UPDATED_COMMENT:
 | 
			
		||||
            case DELETED_COMMENT:
 | 
			
		||||
                AlarmComment alarmComment = JacksonUtil.convertValue(edgeEvent.getBody(), AlarmComment.class);
 | 
			
		||||
                if (alarmComment != null) {
 | 
			
		||||
                    return DownlinkMsg.newBuilder()
 | 
			
		||||
                            .setDownlinkMsgId(EdgeUtils.nextPositiveInt())
 | 
			
		||||
                            .addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory
 | 
			
		||||
                                    .getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment))
 | 
			
		||||
                            .build();
 | 
			
		||||
                }
 | 
			
		||||
            default:
 | 
			
		||||
                return null;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
 | 
			
		||||
        EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
 | 
			
		||||
        AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
 | 
			
		||||
        EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB());
 | 
			
		||||
        switch (actionType) {
 | 
			
		||||
            case DELETED:
 | 
			
		||||
                Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class);
 | 
			
		||||
                if (deletedAlarm == null) {
 | 
			
		||||
                    return Futures.immediateFuture(null);
 | 
			
		||||
                }
 | 
			
		||||
                List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(),
 | 
			
		||||
                        alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId);
 | 
			
		||||
                return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
 | 
			
		||||
            default:
 | 
			
		||||
                ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
 | 
			
		||||
                return Futures.transformAsync(alarmFuture, alarm -> {
 | 
			
		||||
                    if (alarm == null) {
 | 
			
		||||
                        return Futures.immediateFuture(null);
 | 
			
		||||
                    }
 | 
			
		||||
                    EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType());
 | 
			
		||||
                    if (type == null) {
 | 
			
		||||
                        return Futures.immediateFuture(null);
 | 
			
		||||
                    }
 | 
			
		||||
                    List<ListenableFuture<Void>> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(),
 | 
			
		||||
                            alarmId, actionType, null, originatorEdgeId);
 | 
			
		||||
                    return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
 | 
			
		||||
                }, dbCallbackExecutorService);
 | 
			
		||||
        if (EdgeEventActionType.DELETED.equals(actionType)) {
 | 
			
		||||
            Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class);
 | 
			
		||||
            if (deletedAlarm == null) {
 | 
			
		||||
                return Futures.immediateFuture(null);
 | 
			
		||||
            }
 | 
			
		||||
            List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(),
 | 
			
		||||
                    alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId, EdgeEventType.ALARM);
 | 
			
		||||
            return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
 | 
			
		||||
        }
 | 
			
		||||
        ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
 | 
			
		||||
        return Futures.transformAsync(alarmFuture, alarm -> {
 | 
			
		||||
            if (alarm == null) {
 | 
			
		||||
                return Futures.immediateFuture(null);
 | 
			
		||||
            }
 | 
			
		||||
            EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType());
 | 
			
		||||
            if (type == null) {
 | 
			
		||||
                return Futures.immediateFuture(null);
 | 
			
		||||
            }
 | 
			
		||||
            List<ListenableFuture<Void>> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(),
 | 
			
		||||
                    alarmId, actionType, null, originatorEdgeId, EdgeEventType.ALARM);
 | 
			
		||||
            return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
 | 
			
		||||
        }, dbCallbackExecutorService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<ListenableFuture<Void>> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) {
 | 
			
		||||
        PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
 | 
			
		||||
        PageData<EdgeId> pageData;
 | 
			
		||||
    public ListenableFuture<Void> processAlarmCommentNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
 | 
			
		||||
        EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
 | 
			
		||||
        AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
 | 
			
		||||
        EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB());
 | 
			
		||||
        AlarmComment alarmComment = JacksonUtil.fromString(edgeNotificationMsg.getBody(), AlarmComment.class);
 | 
			
		||||
        if (alarmComment == null) {
 | 
			
		||||
            return Futures.immediateFuture(null);
 | 
			
		||||
        }
 | 
			
		||||
        Alarm alarmById = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId()));
 | 
			
		||||
        List<ListenableFuture<Void>> delFutures = pushEventToAllRelatedEdges(tenantId, alarmById.getOriginator(),
 | 
			
		||||
                alarmId, actionType, JacksonUtil.valueToTree(alarmComment), originatorEdgeId, EdgeEventType.ALARM_COMMENT);
 | 
			
		||||
        return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private List<ListenableFuture<Void>> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId,
 | 
			
		||||
                                                                    EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId,
 | 
			
		||||
                                                                    EdgeEventType edgeEventType) {
 | 
			
		||||
        List<ListenableFuture<Void>> futures = new ArrayList<>();
 | 
			
		||||
        do {
 | 
			
		||||
            pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink);
 | 
			
		||||
            if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
 | 
			
		||||
                for (EdgeId relatedEdgeId : pageData.getData()) {
 | 
			
		||||
                    if (!relatedEdgeId.equals(sourceEdgeId)) {
 | 
			
		||||
                        futures.add(saveEdgeEvent(tenantId,
 | 
			
		||||
                                relatedEdgeId,
 | 
			
		||||
                                EdgeEventType.ALARM,
 | 
			
		||||
                                actionType,
 | 
			
		||||
                                alarmId,
 | 
			
		||||
                                body));
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                if (pageData.hasNext()) {
 | 
			
		||||
                    pageLink = pageLink.nextPageLink();
 | 
			
		||||
                }
 | 
			
		||||
        PageDataIterableByTenantIdEntityId<EdgeId> edgeIds =
 | 
			
		||||
                new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, originatorId, DEFAULT_PAGE_SIZE);
 | 
			
		||||
        for (EdgeId relatedEdgeId : edgeIds) {
 | 
			
		||||
            if (!relatedEdgeId.equals(sourceEdgeId)) {
 | 
			
		||||
                futures.add(saveEdgeEvent(tenantId, relatedEdgeId, edgeEventType, actionType, alarmId, body));
 | 
			
		||||
            }
 | 
			
		||||
        } while (pageData != null && pageData.hasNext());
 | 
			
		||||
        }
 | 
			
		||||
        return futures;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
 | 
			
		||||
@ -29,4 +30,8 @@ public interface AlarmProcessor extends EdgeProcessor {
 | 
			
		||||
    ListenableFuture<Void> processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg);
 | 
			
		||||
 | 
			
		||||
    DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg);
 | 
			
		||||
 | 
			
		||||
    DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -19,10 +19,12 @@ import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityView;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmCreateOrUpdateActiveRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmUpdateRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.asset.Asset;
 | 
			
		||||
@ -33,6 +35,8 @@ import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityViewId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmCommentDao;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
 | 
			
		||||
@ -44,6 +48,9 @@ import java.util.UUID;
 | 
			
		||||
@Slf4j
 | 
			
		||||
public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    protected AlarmCommentDao alarmCommentDao;
 | 
			
		||||
 | 
			
		||||
    public ListenableFuture<Void> processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
 | 
			
		||||
        log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg);
 | 
			
		||||
        AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB()));
 | 
			
		||||
@ -93,6 +100,42 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
 | 
			
		||||
        return Futures.immediateFuture(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public ListenableFuture<Void> processAlarmCommentMsg(TenantId tenantId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) {
 | 
			
		||||
        log.trace("[{}] processAlarmCommentMsg [{}]", tenantId, alarmCommentUpdateMsg);
 | 
			
		||||
        AlarmComment alarmComment = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true);
 | 
			
		||||
        if (alarmComment == null) {
 | 
			
		||||
            throw new RuntimeException("[{" + tenantId + "}] alarmCommentUpdateMsg {" + alarmCommentUpdateMsg + "} cannot be converted to alarm comment");
 | 
			
		||||
        }
 | 
			
		||||
        try {
 | 
			
		||||
            Alarm alarm = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId()));
 | 
			
		||||
            if (alarm == null) {
 | 
			
		||||
                return Futures.immediateFuture(null);
 | 
			
		||||
            }
 | 
			
		||||
            switch (alarmCommentUpdateMsg.getMsgType()) {
 | 
			
		||||
                case ENTITY_CREATED_RPC_MESSAGE:
 | 
			
		||||
                    alarmCommentDao.createAlarmComment(tenantId, alarmComment);
 | 
			
		||||
                    break;
 | 
			
		||||
                case ENTITY_UPDATED_RPC_MESSAGE:
 | 
			
		||||
                    alarmCommentService.createOrUpdateAlarmComment(tenantId, alarmComment);
 | 
			
		||||
                    break;
 | 
			
		||||
                case ENTITY_DELETED_RPC_MESSAGE:
 | 
			
		||||
                    AlarmComment alarmCommentToDelete = alarmCommentService.findAlarmCommentById(tenantId, alarmComment.getId());
 | 
			
		||||
                    if (alarmCommentToDelete != null) {
 | 
			
		||||
                        alarmCommentService.saveAlarmComment(tenantId, alarmCommentToDelete);
 | 
			
		||||
                    }
 | 
			
		||||
                    break;
 | 
			
		||||
                case UNRECOGNIZED:
 | 
			
		||||
                default:
 | 
			
		||||
                    return handleUnsupportedMsgType(alarmCommentUpdateMsg.getMsgType());
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] Failed to process alarm comment update msg [{}]", tenantId, alarmCommentUpdateMsg, e);
 | 
			
		||||
            return Futures.immediateFailedFuture(e);
 | 
			
		||||
        }
 | 
			
		||||
        return Futures.immediateFuture(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    protected abstract EntityId getAlarmOriginatorFromMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg);
 | 
			
		||||
 | 
			
		||||
    protected abstract Alarm constructAlarmFromUpdateMsg(TenantId tenantId, AlarmId alarmId, EntityId originatorId, AlarmUpdateMsg alarmUpdateMsg);
 | 
			
		||||
 | 
			
		||||
@ -73,7 +73,7 @@ public class EntityStateSourcingListener {
 | 
			
		||||
        TenantId tenantId = event.getTenantId();
 | 
			
		||||
        EntityId entityId = event.getEntityId();
 | 
			
		||||
        EntityType entityType = entityId.getEntityType();
 | 
			
		||||
        boolean isCreated = event.getAdded() != null && event.getAdded();
 | 
			
		||||
        boolean isCreated = event.getCreated() != null && event.getCreated();
 | 
			
		||||
        ComponentLifecycleEvent lifecycleEvent = isCreated ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED;
 | 
			
		||||
 | 
			
		||||
        switch (entityType) {
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
 | 
			
		||||
 | 
			
		||||
public interface TbAlarmCommentService {
 | 
			
		||||
 | 
			
		||||
    AlarmComment saveAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException;
 | 
			
		||||
 | 
			
		||||
    void deleteAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException;
 | 
			
		||||
 | 
			
		||||
@ -61,7 +61,7 @@ public class DefaultTbTenantService extends AbstractTbEntityService implements T
 | 
			
		||||
        tenantProfileCache.evict(savedTenant.getId());
 | 
			
		||||
 | 
			
		||||
        if (created) {
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(savedTenant.getId()).entity(savedTenant).added(true).build());
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(savedTenant.getId()).entity(savedTenant).created(true).build());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        TenantProfile oldTenantProfile = oldTenant != null ? tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, oldTenant.getTenantProfileId()) : null;
 | 
			
		||||
 | 
			
		||||
@ -182,7 +182,7 @@ public abstract class AbstractOAuth2ClientMapper {
 | 
			
		||||
            installScripts.createDefaultEdgeRuleChains(tenant.getId());
 | 
			
		||||
            tenantProfileCache.evict(tenant.getId());
 | 
			
		||||
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(tenant.getId()).entity(tenant).added(true).build());
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(tenant.getId()).entity(tenant).created(true).build());
 | 
			
		||||
        } else {
 | 
			
		||||
            tenant = tenants.get(0);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -154,24 +154,6 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
 | 
			
		||||
        Mockito.reset(tbClusterService, auditLogService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void testNotifyManyEntityManyTimeMsgToEdgeServiceNever(HasName entity, HasName originator,
 | 
			
		||||
                                                                     TenantId tenantId, CustomerId customerId, UserId userId, String userName,
 | 
			
		||||
                                                                     ActionType actionType, int cntTime, Object... additionalInfo) {
 | 
			
		||||
        EntityId entityId = createEntityId_NULL_UUID(entity);
 | 
			
		||||
        EntityId originatorId = createEntityId_NULL_UUID(originator);
 | 
			
		||||
        testNotificationMsgToEdgeServiceNeverWithActionType(entityId, actionType);
 | 
			
		||||
        ArgumentMatcher<HasName> matcherEntityClassEquals = argument -> argument.getClass().equals(entity.getClass());
 | 
			
		||||
        ArgumentMatcher<EntityId> matcherOriginatorId = argument -> argument.getClass().equals(originatorId.getClass());
 | 
			
		||||
        ArgumentMatcher<CustomerId> matcherCustomerId = customerId == null ?
 | 
			
		||||
                argument -> argument.getClass().equals(CustomerId.class) : argument -> argument.equals(customerId);
 | 
			
		||||
        ArgumentMatcher<UserId> matcherUserId = userId == null ?
 | 
			
		||||
                argument -> argument.getClass().equals(UserId.class) : argument -> argument.equals(userId);
 | 
			
		||||
        testLogEntityActionAdditionalInfo(matcherEntityClassEquals, matcherOriginatorId, tenantId, matcherCustomerId, matcherUserId, userName, actionType, cntTime,
 | 
			
		||||
                extractMatcherAdditionalInfo(additionalInfo));
 | 
			
		||||
        testPushMsgToRuleEngineTime(matcherOriginatorId, tenantId, entity, cntTime);
 | 
			
		||||
        Mockito.reset(tbClusterService, auditLogService);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected void testNotifyManyEntityManyTimeMsgToEdgeServiceEntityEqAny(HasName entity, HasName originator,
 | 
			
		||||
                                                                           TenantId tenantId, CustomerId customerId, UserId userId, String userName,
 | 
			
		||||
                                                                           ActionType actionType,
 | 
			
		||||
@ -318,8 +300,8 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
 | 
			
		||||
    private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) {
 | 
			
		||||
        EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ?
 | 
			
		||||
                EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType);
 | 
			
		||||
        Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(),
 | 
			
		||||
                Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any());
 | 
			
		||||
        Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(),
 | 
			
		||||
                Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void testNotificationMsgToEdgeServiceNever(EntityId entityId) {
 | 
			
		||||
 | 
			
		||||
@ -110,7 +110,7 @@ public class AlarmControllerTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
        Alarm alarm = createAlarm(TEST_ALARM_TYPE);
 | 
			
		||||
 | 
			
		||||
        testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(),
 | 
			
		||||
        testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(),
 | 
			
		||||
                tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, ActionType.ADDED);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -122,7 +122,7 @@ public class AlarmControllerTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
        Alarm alarm = createAlarm(TEST_ALARM_TYPE);
 | 
			
		||||
 | 
			
		||||
        testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(),
 | 
			
		||||
        testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(),
 | 
			
		||||
                tenantId, customerId, tenantAdminUserId, TENANT_ADMIN_EMAIL, ActionType.ADDED);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -772,7 +772,7 @@ public class AlarmControllerTest extends AbstractControllerTest {
 | 
			
		||||
        alarm = doPost("/api/alarm", alarm, Alarm.class);
 | 
			
		||||
        Assert.assertNotNull("Saved alarm is null!", alarm);
 | 
			
		||||
 | 
			
		||||
        testNotifyEntityNeverMsgToEdgeServiceOneTime(alarm, alarm.getId(), tenantId, ActionType.ADDED);
 | 
			
		||||
        testNotifyEntityNever(alarm.getId(), alarm);
 | 
			
		||||
 | 
			
		||||
        resetTokens();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,21 +15,28 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.edge;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import com.fasterxml.jackson.core.type.TypeReference;
 | 
			
		||||
import com.fasterxml.jackson.databind.node.TextNode;
 | 
			
		||||
import com.google.protobuf.AbstractMessage;
 | 
			
		||||
import org.junit.Assert;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.Device;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmComment;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmCommentInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmCommentType;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.AlarmStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AlarmCommentId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.AlarmId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.service.DaoSqlTest;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
 | 
			
		||||
@ -84,26 +91,18 @@ public class AlarmEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        alarm.setOriginator(device.getId());
 | 
			
		||||
        alarm.setType("alarm");
 | 
			
		||||
        alarm.setSeverity(AlarmSeverity.CRITICAL);
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class);
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
        AbstractMessage latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg);
 | 
			
		||||
        AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage;
 | 
			
		||||
        Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true);
 | 
			
		||||
        Assert.assertNotNull(alarmMsg);
 | 
			
		||||
        Assert.assertEquals(savedAlarm, alarmMsg);
 | 
			
		||||
        Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmUpdateMsg.getMsgType());
 | 
			
		||||
        edgeImitator.ignoreType(AlarmCommentUpdateMsg.class);
 | 
			
		||||
 | 
			
		||||
        // ack alarm
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack");
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
        latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        AbstractMessage latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg);
 | 
			
		||||
        alarmUpdateMsg = (AlarmUpdateMsg) latestMessage;
 | 
			
		||||
        AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage;
 | 
			
		||||
        Assert.assertEquals(UpdateMsgType.ALARM_ACK_RPC_MESSAGE, alarmUpdateMsg.getMsgType());
 | 
			
		||||
        alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true);
 | 
			
		||||
        Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true);
 | 
			
		||||
        Assert.assertNotNull(alarmMsg);
 | 
			
		||||
        Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType());
 | 
			
		||||
        Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName());
 | 
			
		||||
@ -137,6 +136,99 @@ public class AlarmEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType());
 | 
			
		||||
        Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName());
 | 
			
		||||
        Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus());
 | 
			
		||||
        edgeImitator.allowIgnoredTypes();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testSendAlarmCommentToCloud() throws Exception {
 | 
			
		||||
        Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge();
 | 
			
		||||
 | 
			
		||||
        Alarm alarm = buildAlarmForUplinkMsg(device.getId());
 | 
			
		||||
 | 
			
		||||
        UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
 | 
			
		||||
        AlarmUpdateMsg.Builder alarmUpdateMgBuilder = AlarmUpdateMsg.newBuilder();
 | 
			
		||||
        alarmUpdateMgBuilder.setIdMSB(alarm.getUuidId().getMostSignificantBits());
 | 
			
		||||
        alarmUpdateMgBuilder.setIdLSB(alarm.getUuidId().getLeastSignificantBits());
 | 
			
		||||
        alarmUpdateMgBuilder.setEntity(JacksonUtil.toString(alarm));
 | 
			
		||||
        testAutoGeneratedCodeByProtobuf(alarmUpdateMgBuilder);
 | 
			
		||||
        uplinkMsgBuilder.addAlarmUpdateMsg(alarmUpdateMgBuilder.build());
 | 
			
		||||
 | 
			
		||||
        testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
 | 
			
		||||
 | 
			
		||||
        edgeImitator.expectResponsesAmount(1);
 | 
			
		||||
        edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForResponses());
 | 
			
		||||
 | 
			
		||||
        AlarmComment alarmComment = buildAlarmCommentForUplinkMsg(alarm.getId());
 | 
			
		||||
 | 
			
		||||
        uplinkMsgBuilder = UplinkMsg.newBuilder();
 | 
			
		||||
        AlarmCommentUpdateMsg.Builder alarmCommentUpdateMgBuilder = AlarmCommentUpdateMsg.newBuilder();
 | 
			
		||||
        alarmCommentUpdateMgBuilder.setEntity(JacksonUtil.toString(alarmComment));
 | 
			
		||||
        alarmCommentUpdateMgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
 | 
			
		||||
        testAutoGeneratedCodeByProtobuf(alarmCommentUpdateMgBuilder);
 | 
			
		||||
        uplinkMsgBuilder.addAlarmCommentUpdateMsg(alarmCommentUpdateMgBuilder.build());
 | 
			
		||||
 | 
			
		||||
        testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
 | 
			
		||||
 | 
			
		||||
        edgeImitator.expectResponsesAmount(1);
 | 
			
		||||
        edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForResponses());
 | 
			
		||||
 | 
			
		||||
        PageData<AlarmCommentInfo> pageData = doGetTyped("/api/alarm/" + alarmComment.getAlarmId().getId() + "/comment" + "?page=0&pageSize=1", new TypeReference<>() {});
 | 
			
		||||
        Assert.assertNotNull("Found pageData is null", pageData);
 | 
			
		||||
        Assert.assertNotEquals("Expected alarms are not found!", 0, pageData.getTotalElements());
 | 
			
		||||
 | 
			
		||||
        Assert.assertNotNull(pageData.getData().get(0));
 | 
			
		||||
        AlarmCommentInfo alarmInfo = pageData.getData().get(0);
 | 
			
		||||
        Assert.assertEquals(alarm.getId(), alarmInfo.getAlarmId());
 | 
			
		||||
        Assert.assertEquals(alarmComment.getAlarmId(), alarmInfo.getAlarmId());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testAlarmComments() throws Exception {
 | 
			
		||||
        Device device = findDeviceByName("Edge Device 1");
 | 
			
		||||
        Alarm alarm = new Alarm();
 | 
			
		||||
        alarm.setOriginator(device.getId());
 | 
			
		||||
        alarm.setType("alarm");
 | 
			
		||||
        alarm.setSeverity(AlarmSeverity.MINOR);
 | 
			
		||||
        Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class);
 | 
			
		||||
 | 
			
		||||
        // create alarm comment
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        AlarmComment alarmComment = new AlarmComment();
 | 
			
		||||
        alarmComment.setComment(new TextNode("Test"));
 | 
			
		||||
        alarmComment.setAlarmId(savedAlarm.getId());
 | 
			
		||||
        alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class);
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
        AbstractMessage latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        AlarmCommentUpdateMsg alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage;
 | 
			
		||||
        Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType());
 | 
			
		||||
        AlarmComment alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true);
 | 
			
		||||
        Assert.assertNotNull(alarmCommentMsg);
 | 
			
		||||
        Assert.assertEquals(alarmComment, alarmCommentMsg);
 | 
			
		||||
 | 
			
		||||
        // update alarm comment
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        alarmComment.setComment(JacksonUtil.newObjectNode().set("text", new TextNode("Updated comment")));
 | 
			
		||||
        alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class);
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
        latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage;
 | 
			
		||||
        Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType());
 | 
			
		||||
        alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true);
 | 
			
		||||
        Assert.assertNotNull(alarmCommentMsg);
 | 
			
		||||
        Assert.assertEquals(alarmComment, alarmCommentMsg);
 | 
			
		||||
 | 
			
		||||
        // delete alarm
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        doDelete("/api/alarm/" + savedAlarm.getUuidId() + "/comment/" + alarmComment.getUuidId())
 | 
			
		||||
                .andExpect(status().isOk());
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
        latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage;
 | 
			
		||||
        Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType());
 | 
			
		||||
        alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true);
 | 
			
		||||
        Assert.assertNotNull(alarmCommentMsg);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Alarm buildAlarmForUplinkMsg(DeviceId deviceId) {
 | 
			
		||||
@ -148,4 +240,16 @@ public class AlarmEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        alarm.setSeverity(AlarmSeverity.CRITICAL);
 | 
			
		||||
        return alarm;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private AlarmComment buildAlarmCommentForUplinkMsg(AlarmId alarmId) {
 | 
			
		||||
        UUID uuid = Uuids.timeBased();
 | 
			
		||||
        AlarmComment alarmComment = new AlarmComment();
 | 
			
		||||
        alarmComment.setAlarmId(alarmId);
 | 
			
		||||
        alarmComment.setType(AlarmCommentType.OTHER);
 | 
			
		||||
        alarmComment.setUserId(tenantAdminUserId);
 | 
			
		||||
        alarmComment.setId(new AlarmCommentId(uuid));
 | 
			
		||||
        alarmComment.setComment(new TextNode("AlarmComment"));
 | 
			
		||||
        alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid));
 | 
			
		||||
        return alarmComment;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -27,6 +27,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 | 
			
		||||
import org.thingsboard.edge.rpc.EdgeGrpcClient;
 | 
			
		||||
import org.thingsboard.edge.rpc.EdgeRpcClient;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
 | 
			
		||||
import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg;
 | 
			
		||||
@ -231,6 +232,11 @@ public class EdgeImitator {
 | 
			
		||||
                result.add(saveDownlinkMsg(alarmUpdateMsg));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        if (downlinkMsg.getAlarmCommentUpdateMsgCount() > 0) {
 | 
			
		||||
            for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : downlinkMsg.getAlarmCommentUpdateMsgList()) {
 | 
			
		||||
                result.add(saveDownlinkMsg(alarmCommentUpdateMsg));
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        if (downlinkMsg.getEntityDataCount() > 0) {
 | 
			
		||||
            for (EntityDataProto entityData : downlinkMsg.getEntityDataList()) {
 | 
			
		||||
                if (randomFailuresOnTimeseriesDownlink) {
 | 
			
		||||
 | 
			
		||||
@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DashboardId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.RuleChainId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmCommentService;
 | 
			
		||||
import org.thingsboard.server.dao.alarm.AlarmService;
 | 
			
		||||
import org.thingsboard.server.dao.asset.AssetProfileService;
 | 
			
		||||
import org.thingsboard.server.dao.asset.AssetService;
 | 
			
		||||
@ -156,6 +157,9 @@ public abstract class BaseEdgeProcessorTest {
 | 
			
		||||
    @MockBean
 | 
			
		||||
    protected AlarmService alarmService;
 | 
			
		||||
 | 
			
		||||
    @MockBean
 | 
			
		||||
    protected AlarmCommentService alarmCommentService;
 | 
			
		||||
 | 
			
		||||
    @MockBean
 | 
			
		||||
    protected DeviceService deviceService;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
 | 
			
		||||
public interface AlarmCommentService {
 | 
			
		||||
 | 
			
		||||
    AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment);
 | 
			
		||||
 | 
			
		||||
    AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment);
 | 
			
		||||
 | 
			
		||||
@ -37,6 +37,9 @@ public enum EdgeEventActionType {
 | 
			
		||||
    ALARM_CLEAR(ActionType.ALARM_CLEAR),
 | 
			
		||||
    ALARM_ASSIGNED(ActionType.ALARM_ASSIGNED),
 | 
			
		||||
    ALARM_UNASSIGNED(ActionType.ALARM_UNASSIGNED),
 | 
			
		||||
    ADDED_COMMENT(ActionType.ADDED_COMMENT),
 | 
			
		||||
    UPDATED_COMMENT(ActionType.UPDATED_COMMENT),
 | 
			
		||||
    DELETED_COMMENT(ActionType.DELETED_COMMENT),
 | 
			
		||||
    ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE),
 | 
			
		||||
    UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE),
 | 
			
		||||
    CREDENTIALS_REQUEST(null),
 | 
			
		||||
 | 
			
		||||
@ -27,6 +27,7 @@ public enum EdgeEventType {
 | 
			
		||||
    ASSET_PROFILE(true, EntityType.ASSET_PROFILE),
 | 
			
		||||
    ENTITY_VIEW(false, EntityType.ENTITY_VIEW),
 | 
			
		||||
    ALARM(false, EntityType.ALARM),
 | 
			
		||||
    ALARM_COMMENT(false, null),
 | 
			
		||||
    RULE_CHAIN(false, EntityType.RULE_CHAIN),
 | 
			
		||||
    RULE_CHAIN_METADATA(false, null),
 | 
			
		||||
    EDGE(false, EntityType.EDGE),
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.page;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Iterator;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.NoSuchElementException;
 | 
			
		||||
@ -64,8 +65,8 @@ public abstract class BasePageDataIterable<T> implements Iterable<T>, Iterator<T
 | 
			
		||||
    private void fetch(PageLink link) {
 | 
			
		||||
        PageData<T> pageData = fetchPageData(link);
 | 
			
		||||
        currentIdx = 0;
 | 
			
		||||
        currentItems = pageData.getData();
 | 
			
		||||
        hasNextPack = pageData.hasNext();
 | 
			
		||||
        currentItems = pageData != null ? pageData.getData() : new ArrayList<>();
 | 
			
		||||
        hasNextPack = pageData != null && pageData.hasNext();
 | 
			
		||||
        nextPackLink = link.nextPageLink();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -325,6 +325,11 @@ message AlarmUpdateMsg {
 | 
			
		||||
  string entity = 18;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message AlarmCommentUpdateMsg {
 | 
			
		||||
  UpdateMsgType msgType = 1;
 | 
			
		||||
  string entity = 2;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message CustomerUpdateMsg {
 | 
			
		||||
  UpdateMsgType msgType = 1;
 | 
			
		||||
  int64 idMSB = 2;
 | 
			
		||||
@ -618,6 +623,7 @@ message UplinkMsg {
 | 
			
		||||
  repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19;
 | 
			
		||||
  repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20;
 | 
			
		||||
  repeated ResourceUpdateMsg resourceUpdateMsg = 21;
 | 
			
		||||
  repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 22;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message UplinkResponseMsg {
 | 
			
		||||
@ -661,5 +667,6 @@ message DownlinkMsg {
 | 
			
		||||
  repeated TenantUpdateMsg tenantUpdateMsg = 26;
 | 
			
		||||
  repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27;
 | 
			
		||||
  repeated ResourceUpdateMsg resourceUpdateMsg = 28;
 | 
			
		||||
  repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 29;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -31,6 +31,8 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
 | 
			
		||||
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
 | 
			
		||||
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
 | 
			
		||||
import org.thingsboard.server.dao.service.DataValidator;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
@ -50,18 +52,28 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al
 | 
			
		||||
    @Override
 | 
			
		||||
    public AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment) {
 | 
			
		||||
        alarmCommentDataValidator.validate(alarmComment, c -> tenantId);
 | 
			
		||||
        if (alarmComment.getId() == null) {
 | 
			
		||||
            return createAlarmComment(tenantId, alarmComment);
 | 
			
		||||
        boolean isCreated = alarmComment.getId() == null;
 | 
			
		||||
        AlarmComment result;
 | 
			
		||||
        if (isCreated) {
 | 
			
		||||
            result = createAlarmComment(tenantId, alarmComment);
 | 
			
		||||
        } else {
 | 
			
		||||
            return updateAlarmComment(tenantId, alarmComment);
 | 
			
		||||
            result = updateAlarmComment(tenantId, alarmComment);
 | 
			
		||||
        }
 | 
			
		||||
        if (result != null) {
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entity(result)
 | 
			
		||||
                    .entityId(result.getAlarmId()).created(isCreated).build());
 | 
			
		||||
        }
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment) {
 | 
			
		||||
        log.debug("Deleting Alarm Comment: {}", alarmComment);
 | 
			
		||||
        alarmCommentDataValidator.validate(alarmComment, c -> tenantId);
 | 
			
		||||
        return alarmCommentDao.save(tenantId, alarmComment);
 | 
			
		||||
        AlarmComment result = alarmCommentDao.save(tenantId, alarmComment);
 | 
			
		||||
        eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entity(result)
 | 
			
		||||
                .entityId(result.getAlarmId()).build());
 | 
			
		||||
        return result;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -112,5 +124,4 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al
 | 
			
		||||
        }
 | 
			
		||||
        return null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -135,7 +135,7 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
 | 
			
		||||
        }
 | 
			
		||||
        if (result.getAlarm() != null) {
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(result.getAlarm().getTenantId())
 | 
			
		||||
                    .entityId(result.getAlarm().getId()).added(true).build());
 | 
			
		||||
                    .entityId(result.getAlarm().getId()).entity(result).created(true).build());
 | 
			
		||||
            publishEvictEvent(new AlarmTypesCacheEvictEvent(request.getTenantId()));
 | 
			
		||||
        }
 | 
			
		||||
        return withPropagated(result);
 | 
			
		||||
 | 
			
		||||
@ -150,7 +150,7 @@ public class AssetProfileServiceImpl extends AbstractCachedEntityService<AssetPr
 | 
			
		||||
                    oldAssetProfile != null ? oldAssetProfile.getName() : null, savedAssetProfile.getId(), savedAssetProfile.isDefault()));
 | 
			
		||||
            if (publishSaveEvent) {
 | 
			
		||||
                eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedAssetProfile.getTenantId()).entity(savedAssetProfile)
 | 
			
		||||
                        .entityId(savedAssetProfile.getId()).added(oldAssetProfile == null).build());
 | 
			
		||||
                        .entityId(savedAssetProfile.getId()).created(oldAssetProfile == null).build());
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            handleEvictEvent(new AssetProfileEvictEvent(assetProfile.getTenantId(), assetProfile.getName(),
 | 
			
		||||
 | 
			
		||||
@ -168,7 +168,7 @@ public class BaseAssetService extends AbstractCachedEntityService<AssetCacheKey,
 | 
			
		||||
            savedAsset = assetDao.saveAndFlush(asset.getTenantId(), asset);
 | 
			
		||||
            publishEvictEvent(evictEvent);
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedAsset.getTenantId())
 | 
			
		||||
                    .entityId(savedAsset.getId()).added(asset.getId() == null).build());
 | 
			
		||||
                    .entityId(savedAsset.getId()).created(asset.getId() == null).build());
 | 
			
		||||
            if (asset.getId() == null) {
 | 
			
		||||
                countService.publishCountEntityEvictEvent(savedAsset.getTenantId(), EntityType.ASSET);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -113,7 +113,7 @@ public class CustomerServiceImpl extends AbstractEntityService implements Custom
 | 
			
		||||
                countService.publishCountEntityEvictEvent(savedCustomer.getTenantId(), EntityType.CUSTOMER);
 | 
			
		||||
            }
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedCustomer.getTenantId())
 | 
			
		||||
                    .entityId(savedCustomer.getId()).added(customer.getId() == null).build());
 | 
			
		||||
                    .entityId(savedCustomer.getId()).created(customer.getId() == null).build());
 | 
			
		||||
            return savedCustomer;
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            checkConstraintViolation(e, "customer_external_id_unq_key", "Customer with such external id already exists!");
 | 
			
		||||
 | 
			
		||||
@ -157,7 +157,7 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb
 | 
			
		||||
            var saved = dashboardDao.save(dashboard.getTenantId(), dashboard);
 | 
			
		||||
            publishEvictEvent(new DashboardTitleEvictEvent(saved.getId()));
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(saved.getTenantId())
 | 
			
		||||
                    .entityId(saved.getId()).added(dashboard.getId() == null).build());
 | 
			
		||||
                    .entityId(saved.getId()).created(dashboard.getId() == null).build());
 | 
			
		||||
            if (dashboard.getId() == null) {
 | 
			
		||||
                countService.publishCountEntityEvictEvent(saved.getTenantId(), EntityType.DASHBOARD);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
@ -188,7 +188,7 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
 | 
			
		||||
                    oldDeviceProfile != null ? oldDeviceProfile.getProvisionDeviceKey() : null));
 | 
			
		||||
            if (publishSaveEvent) {
 | 
			
		||||
                eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedDeviceProfile.getTenantId()).entityId(savedDeviceProfile.getId())
 | 
			
		||||
                        .entity(savedDeviceProfile).oldEntity(oldDeviceProfile).added(oldDeviceProfile == null).build());
 | 
			
		||||
                        .entity(savedDeviceProfile).oldEntity(oldDeviceProfile).created(oldDeviceProfile == null).build());
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            handleEvictEvent(new DeviceProfileEvictEvent(deviceProfile.getTenantId(), deviceProfile.getName(),
 | 
			
		||||
 | 
			
		||||
@ -243,7 +243,7 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
 | 
			
		||||
                countService.publishCountEntityEvictEvent(savedDevice.getTenantId(), EntityType.DEVICE);
 | 
			
		||||
            }
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedDevice.getTenantId()).entityId(savedDevice.getId())
 | 
			
		||||
                    .entity(savedDevice).oldEntity(oldDevice).added(device.getId() == null).build());
 | 
			
		||||
                    .entity(savedDevice).oldEntity(oldDevice).created(device.getId() == null).build());
 | 
			
		||||
            return savedDevice;
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            handleEvictEvent(deviceCacheEvictEvent);
 | 
			
		||||
 | 
			
		||||
@ -167,7 +167,7 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
 | 
			
		||||
            Edge savedEdge = edgeDao.save(edge.getTenantId(), edge);
 | 
			
		||||
            publishEvictEvent(evictEvent);
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedEdge.getTenantId())
 | 
			
		||||
                    .entityId(savedEdge.getId()).entity(savedEdge).added(edge.getId() == null).build());
 | 
			
		||||
                    .entityId(savedEdge.getId()).entity(savedEdge).created(edge.getId() == null).build());
 | 
			
		||||
            return savedEdge;
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            handleEvictEvent(evictEvent);
 | 
			
		||||
 | 
			
		||||
@ -119,7 +119,7 @@ public class EntityViewServiceImpl extends AbstractCachedEntityService<EntityVie
 | 
			
		||||
            EntityView saved = entityViewDao.save(entityView.getTenantId(), entityView);
 | 
			
		||||
            publishEvictEvent(new EntityViewEvictEvent(saved.getTenantId(), saved.getId(), saved.getEntityId(), old != null ? old.getEntityId() : null, saved.getName(), old != null ? old.getName() : null));
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(saved.getTenantId())
 | 
			
		||||
                    .entityId(saved.getId()).added(entityView.getId() == null).build());
 | 
			
		||||
                    .entityId(saved.getId()).created(entityView.getId() == null).build());
 | 
			
		||||
            return saved;
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            checkConstraintViolation(t,
 | 
			
		||||
 | 
			
		||||
@ -27,5 +27,5 @@ public class SaveEntityEvent<T> {
 | 
			
		||||
    private final T entity;
 | 
			
		||||
    private final T oldEntity;
 | 
			
		||||
    private final EntityId entityId;
 | 
			
		||||
    private final Boolean added;
 | 
			
		||||
    private final Boolean created;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -53,7 +53,7 @@ public class DefaultNotificationRuleService extends AbstractEntityService implem
 | 
			
		||||
        try {
 | 
			
		||||
            NotificationRule savedRule = notificationRuleDao.saveAndFlush(tenantId, notificationRule);
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entityId(savedRule.getId())
 | 
			
		||||
                    .added(notificationRule.getId() == null).build());
 | 
			
		||||
                    .created(notificationRule.getId() == null).build());
 | 
			
		||||
            return savedRule;
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            checkConstraintViolation(e, Map.of(
 | 
			
		||||
 | 
			
		||||
@ -84,7 +84,7 @@ public class BaseOtaPackageService extends AbstractCachedEntityService<OtaPackag
 | 
			
		||||
                publishEvictEvent(new OtaPackageCacheEvictEvent(otaPackageId));
 | 
			
		||||
            }
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(result.getTenantId()).entity(result)
 | 
			
		||||
                    .entityId(result.getId()).added(otaPackageId == null).build());
 | 
			
		||||
                    .entityId(result.getId()).created(otaPackageId == null).build());
 | 
			
		||||
            return result;
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            if (otaPackageId != null) {
 | 
			
		||||
@ -110,7 +110,7 @@ public class BaseOtaPackageService extends AbstractCachedEntityService<OtaPackag
 | 
			
		||||
                publishEvictEvent(new OtaPackageCacheEvictEvent(otaPackageId));
 | 
			
		||||
            }
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(result.getTenantId())
 | 
			
		||||
                    .entityId(result.getId()).added(otaPackageId == null).build());
 | 
			
		||||
                    .entityId(result.getId()).created(otaPackageId == null).build());
 | 
			
		||||
            return result;
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            if (otaPackageId != null) {
 | 
			
		||||
 | 
			
		||||
@ -63,7 +63,7 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ
 | 
			
		||||
        queueValidator.validate(queue, Queue::getTenantId);
 | 
			
		||||
        Queue savedQueue = queueDao.save(queue.getTenantId(), queue);
 | 
			
		||||
        eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedQueue.getTenantId())
 | 
			
		||||
                .entityId(savedQueue.getId()).entity(savedQueue).added(queue.getId() == null).build());
 | 
			
		||||
                .entityId(savedQueue.getId()).entity(savedQueue).created(queue.getId() == null).build());
 | 
			
		||||
        return savedQueue;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -90,7 +90,7 @@ public class BaseResourceService extends AbstractCachedEntityService<ResourceInf
 | 
			
		||||
            }
 | 
			
		||||
            publishEvictEvent(new ResourceInfoEvictEvent(tenantId, resource.getId()));
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(saved.getTenantId()).entityId(saved.getId())
 | 
			
		||||
                    .entity(saved).added(resource.getId() == null).build());
 | 
			
		||||
                    .entity(saved).created(resource.getId() == null).build());
 | 
			
		||||
            return saved;
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            publishEvictEvent(new ResourceInfoEvictEvent(tenantId, resource.getId()));
 | 
			
		||||
 | 
			
		||||
@ -125,7 +125,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
 | 
			
		||||
            }
 | 
			
		||||
            if (publishSaveEvent) {
 | 
			
		||||
                eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedRuleChain.getTenantId())
 | 
			
		||||
                        .entity(savedRuleChain).entityId(savedRuleChain.getId()).added(ruleChain.getId() == null).build());
 | 
			
		||||
                        .entity(savedRuleChain).entityId(savedRuleChain.getId()).created(ruleChain.getId() == null).build());
 | 
			
		||||
            }
 | 
			
		||||
            return savedRuleChain;
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
@ -147,7 +147,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
 | 
			
		||||
                previousRootRuleChain.setRoot(false);
 | 
			
		||||
                ruleChainDao.save(tenantId, previousRootRuleChain);
 | 
			
		||||
                eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId)
 | 
			
		||||
                        .entityId(previousRootRuleChain.getId()).entity(previousRootRuleChain).added(false).build());
 | 
			
		||||
                        .entityId(previousRootRuleChain.getId()).entity(previousRootRuleChain).created(false).build());
 | 
			
		||||
                setRootAndSave(tenantId, ruleChain);
 | 
			
		||||
                return true;
 | 
			
		||||
            }
 | 
			
		||||
@ -158,7 +158,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
 | 
			
		||||
    private void setRootAndSave(TenantId tenantId, RuleChain ruleChain) {
 | 
			
		||||
        ruleChain.setRoot(true);
 | 
			
		||||
        ruleChainDao.save(tenantId, ruleChain);
 | 
			
		||||
        eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entityId(ruleChain.getId()).entity(ruleChain).added(false).build());
 | 
			
		||||
        eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entityId(ruleChain.getId()).entity(ruleChain).created(false).build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -95,7 +95,7 @@ public class TenantProfileServiceImpl extends AbstractCachedEntityService<Tenant
 | 
			
		||||
            savedTenantProfile = tenantProfileDao.save(tenantId, tenantProfile);
 | 
			
		||||
            publishEvictEvent(new TenantProfileEvictEvent(savedTenantProfile.getId(), savedTenantProfile.isDefault()));
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entity(savedTenantProfile)
 | 
			
		||||
                    .entityId(savedTenantProfile.getId()).added(tenantProfile.getId() == null).build());
 | 
			
		||||
                    .entityId(savedTenantProfile.getId()).created(tenantProfile.getId() == null).build());
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            handleEvictEvent(new TenantProfileEvictEvent(null, tenantProfile.isDefault()));
 | 
			
		||||
            ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
 | 
			
		||||
 | 
			
		||||
@ -205,7 +205,7 @@ public class TenantServiceImpl extends AbstractCachedEntityService<TenantId, Ten
 | 
			
		||||
        publishEvictEvent(new TenantEvictEvent(savedTenant.getId(), create));
 | 
			
		||||
        if (publishSaveEvent) {
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedTenant.getId())
 | 
			
		||||
                    .entityId(savedTenant.getId()).entity(savedTenant).added(create).build());
 | 
			
		||||
                    .entityId(savedTenant.getId()).entity(savedTenant).created(create).build());
 | 
			
		||||
        }
 | 
			
		||||
        if (tenant.getId() == null) {
 | 
			
		||||
            deviceProfileService.createDefaultDeviceProfile(savedTenant.getId());
 | 
			
		||||
 | 
			
		||||
@ -145,7 +145,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
 | 
			
		||||
                .entity(savedUser)
 | 
			
		||||
                .oldEntity(oldUser)
 | 
			
		||||
                .entityId(savedUser.getId())
 | 
			
		||||
                .added(user.getId() == null).build());
 | 
			
		||||
                .created(user.getId() == null).build());
 | 
			
		||||
        return savedUser;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -97,7 +97,7 @@ public class WidgetTypeServiceImpl implements WidgetTypeService {
 | 
			
		||||
            imageService.replaceBase64WithImageUrl(widgetTypeDetails);
 | 
			
		||||
            WidgetTypeDetails result = widgetTypeDao.save(widgetTypeDetails.getTenantId(), widgetTypeDetails);
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(result.getTenantId())
 | 
			
		||||
                    .entityId(result.getId()).added(widgetTypeDetails.getId() == null).build());
 | 
			
		||||
                    .entityId(result.getId()).created(widgetTypeDetails.getId() == null).build());
 | 
			
		||||
            return result;
 | 
			
		||||
        } catch (Exception t) {
 | 
			
		||||
            AbstractCachedEntityService.checkConstraintViolation(t,
 | 
			
		||||
@ -209,7 +209,7 @@ public class WidgetTypeServiceImpl implements WidgetTypeService {
 | 
			
		||||
            widgetTypeDao.saveWidgetsBundleWidget(widgetsBundleWidget);
 | 
			
		||||
        }
 | 
			
		||||
        eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId)
 | 
			
		||||
                .entityId(widgetsBundleId).added(false).build());
 | 
			
		||||
                .entityId(widgetsBundleId).created(false).build());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -78,7 +78,7 @@ public class WidgetsBundleServiceImpl implements WidgetsBundleService {
 | 
			
		||||
            imageService.replaceBase64WithImageUrl(widgetsBundle, "bundle");
 | 
			
		||||
            WidgetsBundle result = widgetsBundleDao.save(widgetsBundle.getTenantId(), widgetsBundle);
 | 
			
		||||
            eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(result.getTenantId())
 | 
			
		||||
                    .entityId(result.getId()).added(widgetsBundle.getId() == null).build());
 | 
			
		||||
                    .entityId(result.getId()).created(widgetsBundle.getId() == null).build());
 | 
			
		||||
            return result;
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            AbstractCachedEntityService.checkConstraintViolation(e,
 | 
			
		||||
 | 
			
		||||
@ -27,6 +27,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.DataConstants;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.StringUtils;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
@ -94,7 +95,8 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
 | 
			
		||||
                    }
 | 
			
		||||
                    break;
 | 
			
		||||
                case ATTRIBUTES_DELETED:
 | 
			
		||||
                    List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {});
 | 
			
		||||
                    List<String> keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {
 | 
			
		||||
                    });
 | 
			
		||||
                    entityBody.put("keys", keys);
 | 
			
		||||
                    entityBody.put(SCOPE, getScope(metadata));
 | 
			
		||||
                    break;
 | 
			
		||||
@ -138,9 +140,8 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
 | 
			
		||||
    abstract void processMsg(TbContext ctx, TbMsg msg);
 | 
			
		||||
 | 
			
		||||
    protected UUID getUUIDFromMsgData(TbMsg msg) {
 | 
			
		||||
        JsonNode data = JacksonUtil.toJsonNode(msg.getData()).get("id");
 | 
			
		||||
        String id = JacksonUtil.convertValue(data.get("id"), String.class);
 | 
			
		||||
        return UUID.fromString(id);
 | 
			
		||||
        Alarm alarm = JacksonUtil.fromString(msg.getData(), Alarm.class);
 | 
			
		||||
        return alarm != null ? alarm.getUuidId() : null;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected String getScope(Map<String, String> metadata) {
 | 
			
		||||
@ -174,7 +175,7 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected boolean isSupportedMsgType(TbMsg msg) {
 | 
			
		||||
        return msg.isTypeOneOf(POST_TELEMETRY_REQUEST, POST_ATTRIBUTES_REQUEST, ATTRIBUTES_UPDATED,
 | 
			
		||||
                ATTRIBUTES_DELETED, TIMESERIES_UPDATED, ALARM, CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT);
 | 
			
		||||
        return msg.isTypeOneOf(POST_TELEMETRY_REQUEST, POST_ATTRIBUTES_REQUEST, ATTRIBUTES_UPDATED, ATTRIBUTES_DELETED, TIMESERIES_UPDATED,
 | 
			
		||||
                ALARM, CONNECT_EVENT, DISCONNECT_EVENT, ACTIVITY_EVENT, INACTIVITY_EVENT);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -38,15 +38,7 @@ import java.util.UUID;
 | 
			
		||||
                "This node used only on edge to push messages from edge to cloud. " +
 | 
			
		||||
                "Once message arrived into this node it’s going to be converted into cloud event and saved to the local database. " +
 | 
			
		||||
                "Node doesn't push messages directly to cloud, but stores event(s) in the cloud queue. " +
 | 
			
		||||
                "<br>Supports next originator types:" +
 | 
			
		||||
                "<br><code>DEVICE</code>" +
 | 
			
		||||
                "<br><code>ASSET</code>" +
 | 
			
		||||
                "<br><code>ENTITY_VIEW</code>" +
 | 
			
		||||
                "<br><code>DASHBOARD</code>" +
 | 
			
		||||
                "<br><code>TENANT</code>" +
 | 
			
		||||
                "<br><code>CUSTOMER</code>" +
 | 
			
		||||
                "<br><code>EDGE</code><br><br>" +
 | 
			
		||||
                "As well node supports next message types:" +
 | 
			
		||||
                "Supports next message types:" +
 | 
			
		||||
                "<br><code>POST_TELEMETRY_REQUEST</code>" +
 | 
			
		||||
                "<br><code>POST_ATTRIBUTES_REQUEST</code>" +
 | 
			
		||||
                "<br><code>ATTRIBUTES_UPDATED</code>" +
 | 
			
		||||
 | 
			
		||||
@ -31,8 +31,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType;
 | 
			
		||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EdgeId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.data.rule.RuleChainType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
@ -52,21 +51,13 @@ import java.util.UUID;
 | 
			
		||||
                "This node used only on cloud instances to push messages from cloud to edge. " +
 | 
			
		||||
                "Once message arrived into this node it’s going to be converted into edge event and saved to the database. " +
 | 
			
		||||
                "Node doesn't push messages directly to edge, but stores event(s) in the edge queue. " +
 | 
			
		||||
                "<br>Supports next originator types:" +
 | 
			
		||||
                "<br><code>DEVICE</code>" +
 | 
			
		||||
                "<br><code>ASSET</code>" +
 | 
			
		||||
                "<br><code>ENTITY_VIEW</code>" +
 | 
			
		||||
                "<br><code>DASHBOARD</code>" +
 | 
			
		||||
                "<br><code>TENANT</code>" +
 | 
			
		||||
                "<br><code>CUSTOMER</code>" +
 | 
			
		||||
                "<br><code>EDGE</code><br><br>" +
 | 
			
		||||
                "As well node supports next message types:" +
 | 
			
		||||
                "Supports next message types:" +
 | 
			
		||||
                "<br><code>POST_TELEMETRY_REQUEST</code>" +
 | 
			
		||||
                "<br><code>POST_ATTRIBUTES_REQUEST</code>" +
 | 
			
		||||
                "<br><code>ATTRIBUTES_UPDATED</code>" +
 | 
			
		||||
                "<br><code>ATTRIBUTES_DELETED</code>" +
 | 
			
		||||
                "<br><code>ALARM</code><br><br>" +
 | 
			
		||||
                "Message will be routed via <b>Failure</b> route if node was not able to save edge event to database or unsupported originator type/message type arrived. " +
 | 
			
		||||
                "Message will be routed via <b>Failure</b> route if node was not able to save edge event to database or unsupported message type arrived. " +
 | 
			
		||||
                "In case successful storage edge event to database message will be routed via <b>Success</b> route.",
 | 
			
		||||
        uiResources = {"static/rulenode/rulenode-core-config.js"},
 | 
			
		||||
        configDirective = "tbActionNodePushToEdgeConfig",
 | 
			
		||||
@ -129,21 +120,13 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode<TbMsgPushToEdgeNo
 | 
			
		||||
                };
 | 
			
		||||
                Futures.addCallback(future, futureCallback, ctx.getDbCallbackExecutor());
 | 
			
		||||
            } else {
 | 
			
		||||
                PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
 | 
			
		||||
                PageData<EdgeId> pageData;
 | 
			
		||||
                List<ListenableFuture<Void>> futures = new ArrayList<>();
 | 
			
		||||
                do {
 | 
			
		||||
                    pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink);
 | 
			
		||||
                    if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
 | 
			
		||||
                        for (EdgeId edgeId : pageData.getData()) {
 | 
			
		||||
                            EdgeEvent edgeEvent = buildEvent(msg, ctx);
 | 
			
		||||
                            futures.add(notifyEdge(ctx, edgeEvent, edgeId));
 | 
			
		||||
                        }
 | 
			
		||||
                        if (pageData.hasNext()) {
 | 
			
		||||
                            pageLink = pageLink.nextPageLink();
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                } while (pageData != null && pageData.hasNext());
 | 
			
		||||
                PageDataIterableByTenantIdEntityId<EdgeId> edgeIds = new PageDataIterableByTenantIdEntityId<>(
 | 
			
		||||
                        ctx.getEdgeService()::findRelatedEdgeIdsByEntityId, ctx.getTenantId(), msg.getOriginator(), DEFAULT_PAGE_SIZE);
 | 
			
		||||
                for (EdgeId edgeId : edgeIds) {
 | 
			
		||||
                    EdgeEvent edgeEvent = buildEvent(msg, ctx);
 | 
			
		||||
                    futures.add(notifyEdge(ctx, edgeEvent, edgeId));
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                if (futures.isEmpty()) {
 | 
			
		||||
                    // ack in case no edges are related to provided entity
 | 
			
		||||
@ -172,5 +155,4 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode<TbMsgPushToEdgeNo
 | 
			
		||||
        edgeEvent.setEdgeId(edgeId);
 | 
			
		||||
        return ctx.getEdgeEventService().saveAsync(edgeEvent);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user