From 8c94ec5c5277610c1dd494c75c1c677898885210 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 16 Oct 2023 18:05:12 +0300 Subject: [PATCH 1/5] Edge: do not sync User login actions. Edge: do not save particular edge events in case edge is not activated or disconnected --- .../edge/EdgeEventSourcingListener.java | 42 +++++++++++--- .../edge/rpc/processor/BaseEdgeProcessor.java | 58 +++++++++++++++++++ .../server/edge/AbstractEdgeTest.java | 57 ++++++++---------- .../server/edge/DeviceEdgeTest.java | 2 - .../thingsboard/server/edge/UserEdgeTest.java | 8 --- .../dao/eventsourcing/SaveEntityEvent.java | 1 + .../server/dao/user/UserServiceImpl.java | 7 ++- 7 files changed, 123 insertions(+), 52 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 43b05094a4..707dbec964 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.service.edge; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -36,6 +38,7 @@ import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent; import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.eventsourcing.RelationActionEvent; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; +import org.thingsboard.server.dao.user.UserServiceImpl; import javax.annotation.PostConstruct; @@ -75,7 +78,7 @@ public class EdgeEventSourcingListener { return; } try { - if (!isValidEdgeEventEntity(event.getEntity())) { + if (!isValidEdgeEventEntity(event.getEntity(), event.getOldEntity())) { return; } log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); @@ -83,7 +86,7 @@ public class EdgeEventSourcingListener { tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), null, null, action); } catch (Exception e) { - log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event); + log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e); } } @@ -97,7 +100,7 @@ public class EdgeEventSourcingListener { tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED); } catch (Exception e) { - log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event); + log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e); } } @@ -111,7 +114,7 @@ public class EdgeEventSourcingListener { tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), event.getBody(), null, edgeTypeByActionType(event.getActionType())); } catch (Exception e) { - log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event); + log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e); } } @@ -134,11 +137,11 @@ public class EdgeEventSourcingListener { tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType())); } catch (Exception e) { - log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event); + log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e); } } - private boolean isValidEdgeEventEntity(Object entity) { + private boolean isValidEdgeEventEntity(Object entity, Object oldEntity) { if (entity instanceof OtaPackageInfo) { OtaPackageInfo otaPackageInfo = (OtaPackageInfo) entity; return otaPackageInfo.hasUrl() || otaPackageInfo.isHasData(); @@ -147,7 +150,15 @@ public class EdgeEventSourcingListener { return RuleChainType.EDGE.equals(ruleChain.getType()); } else if (entity instanceof User) { User user = (User) entity; - return !Authority.SYS_ADMIN.equals(user.getAuthority()); + if (Authority.SYS_ADMIN.equals(user.getAuthority())) { + return false; + } + if (oldEntity != null) { + User oldUser = (User) oldEntity; + cleanUpUserAdditionalInfo(oldUser); + cleanUpUserAdditionalInfo(user); + return !user.equals(oldUser); + } } else if (entity instanceof AlarmApiCallResult) { AlarmApiCallResult alarmApiCallResult = (AlarmApiCallResult) entity; return alarmApiCallResult.isModified(); @@ -155,4 +166,21 @@ public class EdgeEventSourcingListener { // Default: If the entity doesn't match any of the conditions, consider it as valid. return true; } + + private void cleanUpUserAdditionalInfo(User user) { + // reset FAILED_LOGIN_ATTEMPTS and LAST_LOGIN_TS - edge is not interested in this information + if (user.getAdditionalInfo() instanceof NullNode) { + user.setAdditionalInfo(null); + } + if (user.getAdditionalInfo() instanceof ObjectNode) { + ObjectNode additionalInfo = ((ObjectNode) user.getAdditionalInfo()); + additionalInfo.remove(UserServiceImpl.FAILED_LOGIN_ATTEMPTS); + additionalInfo.remove(UserServiceImpl.LAST_LOGIN_TS); + if (additionalInfo.isEmpty()) { + user.setAdditionalInfo(null); + } else { + user.setAdditionalInfo(additionalInfo); + } + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 328a64efe2..2d86c1e184 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -24,6 +24,7 @@ import org.springframework.context.annotation.Lazy; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Dashboard; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; @@ -46,6 +47,7 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; @@ -110,11 +112,13 @@ import org.thingsboard.server.service.entitiy.TbNotificationEntityService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import org.thingsboard.server.service.state.DefaultDeviceStateService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -305,6 +309,60 @@ public abstract class BaseEdgeProcessor { EdgeEventActionType action, EntityId entityId, JsonNode body) { + ListenableFuture> future = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE); + return Futures.transformAsync(future, activeOpt -> { + if (activeOpt.isEmpty()) { + log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + + "action [{}], entityId [{}], body [{}]", + tenantId, edgeId, type, action, entityId, body); + return Futures.immediateFuture(null); + } + if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } else { + if (doSaveIfEdgeIsOffline(type, action)) { + return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body); + } else { + log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + + "action [{}], entityId [{}], body [{}]", + tenantId, edgeId, type, action, entityId, body); + return Futures.immediateFuture(null); + } + } + }, dbCallbackExecutorService); + } + + private boolean doSaveIfEdgeIsOffline(EdgeEventType type, + EdgeEventActionType action) { + switch (action) { + case TIMESERIES_UPDATED: + case ALARM_ACK: + case ALARM_CLEAR: + case ALARM_ASSIGNED: + case ALARM_UNASSIGNED: + case CREDENTIALS_REQUEST: + return true; + } + switch (type) { + case ALARM: + case RULE_CHAIN: + case RULE_CHAIN_METADATA: + case USER: + case CUSTOMER: + case TENANT: + case TENANT_PROFILE: + case WIDGETS_BUNDLE: + case WIDGET_TYPE: + case ADMIN_SETTINGS: + case OTA_PACKAGE: + case QUEUE: + case RELATION: + return true; + } + return false; + } + + private ListenableFuture doSaveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, EntityId entityId, JsonNode body) { log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type[{}], " + "action [{}], entityId [{}], body [{}]", tenantId, edgeId, type, action, entityId, body); diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 689847416c..43544dd5cf 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -130,7 +130,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { installation(); edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret()); - edgeImitator.expectMessageAmount(26); + edgeImitator.expectMessageAmount(21); edgeImitator.connect(); requestEdgeRuleChainMetadata(); @@ -163,9 +163,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { @After public void teardownEdgeTest() { try { - edgeImitator.expectMessageAmount(2); loginTenantAdmin(); - Assert.assertTrue(edgeImitator.waitForMessages()); doDelete("/api/edge/" + edge.getId().toString()) .andExpect(status().isOk()); @@ -228,33 +226,33 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { // 1 message from queue fetcher validateQueues(); - // 2 messages - 1 from rule chain fetcher and 1 from rule chain controller + // 1 from rule chain fetcher UUID ruleChainUUID = validateRuleChains(); // 1 from request message validateRuleChainMetadataUpdates(ruleChainUUID); - // 4 messages - 4 messages from fetcher - 2 from system level ('mail', 'mailTemplates') and 2 from admin level ('mail', 'mailTemplates') + // 4 messages + // - 2 from fetcher - system level ('mail', 'mailTemplates') + // - 2 from fetcher - admin level ('mail', 'mailTemplates') validateAdminSettings(); - // 5 messages - // - 1 from default profile fetcher - // - 2 from device profile fetcher (default and thermostat) - // - 1 from device fetcher - // - 1 from device controller (thermostat) - validateDeviceProfiles(); - // 4 messages // - 1 from default profile fetcher + // - 2 from device profile fetcher (default and thermostat) + // - 1 from device fetcher + validateDeviceProfiles(); + + // 3 messages + // - 1 from default profile fetcher // - 1 message from asset profile fetcher // - 1 message from asset fetcher - // - 1 message from asset controller validateAssetProfiles(); - // 2 messages - 1 from device fetcher and 1 from device controller + // 1 from device fetcher validateDevices(); - // 2 messages - 1 from asset fetcher and 1 from asset controller + // 1 from asset fetcher validateAssets(); // 1 message from public customer fetcher @@ -308,8 +306,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { // default msg device profile from fetcher // thermostat msg from device profile fetcher // thermostat msg from device fetcher - // thermostat msg from creation of device - Assert.assertEquals(5, deviceProfileUpdateMsgList.size()); + Assert.assertEquals(4, deviceProfileUpdateMsgList.size()); Optional thermostatProfileUpdateMsgOpt = deviceProfileUpdateMsgList.stream().filter(dfum -> THERMOSTAT_DEVICE_PROFILE_NAME.equals(dfum.getName())).findAny(); Assert.assertTrue(thermostatProfileUpdateMsgOpt.isPresent()); @@ -326,10 +323,9 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { } private void validateDevices() throws Exception { - List deviceUpdateMsgs = edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class); - Assert.assertEquals(2, deviceUpdateMsgs.size()); - validateDevice(deviceUpdateMsgs.get(0)); - validateDevice(deviceUpdateMsgs.get(1)); + Optional deviceUpdateMsgOpt = edgeImitator.findMessageByType(DeviceUpdateMsg.class); + Assert.assertTrue(deviceUpdateMsgOpt.isPresent()); + validateDevice(deviceUpdateMsgOpt.get()); } private void validateDevice(DeviceUpdateMsg deviceUpdateMsg) throws Exception { @@ -345,10 +341,9 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { } private void validateAssets() throws Exception { - List assetUpdateMsgs = edgeImitator.findAllMessagesByType(AssetUpdateMsg.class); - Assert.assertEquals(2, assetUpdateMsgs.size()); - validateAsset(assetUpdateMsgs.get(0)); - validateAsset(assetUpdateMsgs.get(1)); + Optional assetUpdateMsgOpt = edgeImitator.findMessageByType(AssetUpdateMsg.class); + Assert.assertTrue(assetUpdateMsgOpt.isPresent()); + validateAsset(assetUpdateMsgOpt.get()); } private void validateAsset(AssetUpdateMsg assetUpdateMsg) throws Exception { @@ -365,12 +360,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { } private UUID validateRuleChains() throws Exception { - List ruleChainUpdateMsgs = edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class); - Assert.assertEquals(2, ruleChainUpdateMsgs.size()); - RuleChainUpdateMsg ruleChainCreateMsg = ruleChainUpdateMsgs.get(0); - RuleChainUpdateMsg ruleChainUpdateMsg = ruleChainUpdateMsgs.get(1); - validateRuleChain(ruleChainCreateMsg, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); - validateRuleChain(ruleChainUpdateMsg, UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE); + Optional ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class); + Assert.assertTrue(ruleChainUpdateMsgOpt.isPresent()); + RuleChainUpdateMsg ruleChainUpdateMsg = ruleChainUpdateMsgOpt.get(); + validateRuleChain(ruleChainUpdateMsg, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); return new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB()); } @@ -429,7 +422,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { private void validateAssetProfiles() throws Exception { List assetProfileUpdateMsgs = edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class); - Assert.assertEquals(4, assetProfileUpdateMsgs.size()); + Assert.assertEquals(3, assetProfileUpdateMsgs.size()); AssetProfileUpdateMsg assetProfileUpdateMsg = assetProfileUpdateMsgs.get(0); Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, assetProfileUpdateMsg.getMsgType()); UUID assetProfileUUID = new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB()); diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index aed99b4cba..ca8dd5dbf0 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -274,9 +274,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest { tenantProfile.getProfileData().setConfiguration(profileConfiguration); doPost("/api/tenantProfile/", tenantProfile, TenantProfile.class); - edgeImitator.expectMessageAmount(2); loginTenantAdmin(); - Assert.assertTrue(edgeImitator.waitForMessages()); UUID uuid = Uuids.timeBased(); diff --git a/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java index 3fd39328d5..3dc398b9f3 100644 --- a/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/UserEdgeTest.java @@ -79,9 +79,7 @@ public class UserEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedTenantAdmin.getLastName(), userUpdateMsg.getLastName()); // update user credentials - edgeImitator.expectMessageAmount(2); login(savedTenantAdmin.getEmail(), "tenant"); - Assert.assertTrue(edgeImitator.waitForMessages()); edgeImitator.expectMessageAmount(1); ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest(); @@ -96,9 +94,7 @@ public class UserEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedTenantAdmin.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB()); Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword())); - edgeImitator.expectMessageAmount(2); loginTenantAdmin(); - Assert.assertTrue(edgeImitator.waitForMessages()); // delete user edgeImitator.expectMessageAmount(1); @@ -164,9 +160,7 @@ public class UserEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedCustomerUser.getLastName(), userUpdateMsg.getLastName()); // update user credentials - edgeImitator.expectMessageAmount(2); login(savedCustomerUser.getEmail(), "customer"); - Assert.assertTrue(edgeImitator.waitForMessages()); edgeImitator.expectMessageAmount(1); ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest(); @@ -181,9 +175,7 @@ public class UserEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedCustomerUser.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB()); Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword())); - edgeImitator.expectMessageAmount(2); loginTenantAdmin(); - Assert.assertTrue(edgeImitator.waitForMessages()); // delete user edgeImitator.expectMessageAmount(1); diff --git a/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/SaveEntityEvent.java b/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/SaveEntityEvent.java index 205f592d43..cc2e854f59 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/SaveEntityEvent.java +++ b/dao/src/main/java/org/thingsboard/server/dao/eventsourcing/SaveEntityEvent.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId; public class SaveEntityEvent { private final TenantId tenantId; private final T entity; + private final T oldEntity; private final EntityId entityId; private final Boolean added; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java index 4db1eb668a..a584fef86c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java @@ -70,8 +70,8 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic public static final String USER_PASSWORD_HISTORY = "userPasswordHistory"; - private static final String LAST_LOGIN_TS = "lastLoginTs"; - private static final String FAILED_LOGIN_ATTEMPTS = "failedLoginAttempts"; + public static final String LAST_LOGIN_TS = "lastLoginTs"; + public static final String FAILED_LOGIN_ATTEMPTS = "failedLoginAttempts"; private static final int DEFAULT_TOKEN_LENGTH = 30; public static final String INCORRECT_USER_ID = "Incorrect userId "; @@ -126,7 +126,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic @Override public User saveUser(TenantId tenantId, User user) { log.trace("Executing saveUser [{}]", user); - userValidator.validate(user, User::getTenantId); + User oldUser = userValidator.validate(user, User::getTenantId); if (!userLoginCaseSensitive) { user.setEmail(user.getEmail().toLowerCase()); } @@ -143,6 +143,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic eventPublisher.publishEvent(SaveEntityEvent.builder() .tenantId(tenantId == null ? TenantId.SYS_TENANT_ID : tenantId) .entity(savedUser) + .oldEntity(oldUser) .entityId(savedUser.getId()) .added(user.getId() == null).build()); return savedUser; From 3946b058cdda60fbdfb03d92d0c60fb3d1399605 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 17 Oct 2023 18:09:38 +0300 Subject: [PATCH 2/5] Do not push ALARM update to edge automatically - only using push to edge rule node --- .../service/edge/EdgeEventSourcingListener.java | 10 +++++----- .../thingsboard/server/edge/AlarmEdgeTest.java | 15 --------------- ui-ngx/src/app/core/http/entity.service.ts | 10 +++++++--- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 707dbec964..006af577c9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -25,6 +25,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -78,7 +79,7 @@ public class EdgeEventSourcingListener { return; } try { - if (!isValidEdgeEventEntity(event.getEntity(), event.getOldEntity())) { + if (!isValidSaveEntityEventForEdgeProcessing(event.getEntity(), event.getOldEntity())) { return; } log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); @@ -141,7 +142,7 @@ public class EdgeEventSourcingListener { } } - private boolean isValidEdgeEventEntity(Object entity, Object oldEntity) { + private boolean isValidSaveEntityEventForEdgeProcessing(Object entity, Object oldEntity) { if (entity instanceof OtaPackageInfo) { OtaPackageInfo otaPackageInfo = (OtaPackageInfo) entity; return otaPackageInfo.hasUrl() || otaPackageInfo.isHasData(); @@ -159,9 +160,8 @@ public class EdgeEventSourcingListener { cleanUpUserAdditionalInfo(user); return !user.equals(oldUser); } - } else if (entity instanceof AlarmApiCallResult) { - AlarmApiCallResult alarmApiCallResult = (AlarmApiCallResult) entity; - return alarmApiCallResult.isModified(); + } else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm) { + return false; } // Default: If the entity doesn't match any of the conditions, consider it as valid. return true; diff --git a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java index afccc634ed..259572851c 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.alarm.Alarm; @@ -102,20 +101,6 @@ public class AlarmEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedAlarm.getStatus().name(), alarmUpdateMsg.getStatus()); Assert.assertEquals(savedAlarm.getSeverity().name(), alarmUpdateMsg.getSeverity()); - // update alarm - String updatedDetails = "{\"testKey\":\"testValue\"}"; - savedAlarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(updatedDetails)); - edgeImitator.expectMessageAmount(1); - savedAlarm = doPost("/api/alarm", savedAlarm, Alarm.class); - Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; - Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); - Assert.assertEquals(savedAlarm.getUuidId().getMostSignificantBits(), alarmUpdateMsg.getIdMSB()); - Assert.assertEquals(savedAlarm.getUuidId().getLeastSignificantBits(), alarmUpdateMsg.getIdLSB()); - Assert.assertEquals(updatedDetails, alarmUpdateMsg.getDetails()); - // ack alarm edgeImitator.expectMessageAmount(1); doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack"); diff --git a/ui-ngx/src/app/core/http/entity.service.ts b/ui-ngx/src/app/core/http/entity.service.ts index 12f6300b93..cc1dd13345 100644 --- a/ui-ngx/src/app/core/http/entity.service.ts +++ b/ui-ngx/src/app/core/http/entity.service.ts @@ -92,6 +92,7 @@ import { NotificationService } from '@core/http/notification.service'; import { TenantProfileService } from '@core/http/tenant-profile.service'; import { NotificationType } from '@shared/models/notification.models'; import { UserId } from '@shared/models/id/user-id'; +import { AlarmService } from '@core/http/alarm.service'; @Injectable({ providedIn: 'root' @@ -119,7 +120,8 @@ export class EntityService { private assetProfileService: AssetProfileService, private utils: UtilsService, private queueService: QueueService, - private notificationService: NotificationService + private notificationService: NotificationService, + private alarmService: AlarmService ) { } private getEntityObservable(entityType: EntityType, entityId: string, @@ -155,7 +157,7 @@ export class EntityService { observable = this.ruleChainService.getRuleChain(entityId, config); break; case EntityType.ALARM: - console.error('Get Alarm Entity is not implemented!'); + observable = this.alarmService.getAlarm(entityId, config); break; case EntityType.OTA_PACKAGE: observable = this.otaPackageService.getOtaPackageInfo(entityId, config); @@ -238,7 +240,9 @@ export class EntityService { entityIds); break; case EntityType.ALARM: - console.error('Get Alarm Entity is not implemented!'); + observable = this.getEntitiesByIdsObservable( + (id) => this.alarmService.getAlarm(id, config), + entityIds); break; case EntityType.DEVICE_PROFILE: observable = this.getEntitiesByIdsObservable( From 04cfa2021dcc989d6130a43334a435dfaeb2f0d9 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 17 Oct 2023 18:50:42 +0300 Subject: [PATCH 3/5] Fixed edge controller tests --- .../edge/rpc/processor/BaseEdgeProcessor.java | 3 ++- .../server/controller/EdgeControllerTest.java | 11 +++++++++-- .../server/controller/EdgeEventControllerTest.java | 13 ++++++++++--- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 2d86c1e184..a888ddc27b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -309,7 +309,8 @@ public abstract class BaseEdgeProcessor { EdgeEventActionType action, EntityId entityId, JsonNode body) { - ListenableFuture> future = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE); + ListenableFuture> future = + attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE); return Futures.transformAsync(future, activeOpt -> { if (activeOpt.isEmpty()) { log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " + diff --git a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java index 1afeaff0cb..b9e0b8cd6c 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.controller; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -34,8 +35,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.StringUtils; @@ -852,6 +855,11 @@ public class EdgeControllerTest extends AbstractControllerTest { Edge edge = doPost("/api/edge", constructEdge("Test Sync Edge", "test"), Edge.class); + // simulate edge activation + ObjectNode attributes = JacksonUtil.newObjectNode(); + attributes.put("active", true); + doPost("/api/plugins/telemetry/EDGE/" + edge.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributes); + doPost("/api/edge/" + edge.getId().getId().toString() + "/device/" + savedDevice.getId().getId().toString(), Device.class); doPost("/api/edge/" + edge.getId().getId().toString() @@ -860,13 +868,12 @@ public class EdgeControllerTest extends AbstractControllerTest { EdgeImitator edgeImitator = new EdgeImitator(EDGE_HOST, EDGE_PORT, edge.getRoutingKey(), edge.getSecret()); edgeImitator.ignoreType(UserCredentialsUpdateMsg.class); - edgeImitator.expectMessageAmount(25); + edgeImitator.expectMessageAmount(24); edgeImitator.connect(); assertThat(edgeImitator.waitForMessages()).as("await for messages on first connect").isTrue(); verifyFetchersMsgs(edgeImitator); // verify queue msgs - Assert.assertTrue(popRuleChainMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, "Edge Root Rule Chain")); Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); Assert.assertTrue(popDeviceMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Test Sync Edge Device 1")); Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "test")); diff --git a/application/src/test/java/org/thingsboard/server/controller/EdgeEventControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdgeEventControllerTest.java index 04842df6a0..3303c64c27 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdgeEventControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdgeEventControllerTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.junit.After; @@ -26,6 +27,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; @@ -86,6 +89,11 @@ public class EdgeEventControllerTest extends AbstractControllerTest { Edge edge = constructEdge("TestEdge", "default"); edge = doPost("/api/edge", edge, Edge.class); + // simulate edge activation + ObjectNode attributes = JacksonUtil.newObjectNode(); + attributes.put("active", true); + doPost("/api/plugins/telemetry/EDGE/" + edge.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributes); + Device device = constructDevice("TestDevice", "default"); Device savedDevice = doPost("/api/device", device, Device.class); @@ -99,14 +107,13 @@ public class EdgeEventControllerTest extends AbstractControllerTest { EntityRelation relation = new EntityRelation(savedAsset.getId(), savedDevice.getId(), EntityRelation.CONTAINS_TYPE); - awaitForNumberOfEdgeEvents(edgeId, 3); + awaitForNumberOfEdgeEvents(edgeId, 2); doPost("/api/relation", relation); - awaitForNumberOfEdgeEvents(edgeId, 4); + awaitForNumberOfEdgeEvents(edgeId, 3); List edgeEvents = findEdgeEvents(edgeId); - Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.RULE_CHAIN)); // root rule chain Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.DEVICE)); // TestDevice Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.ASSET)); // TestAsset Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.RELATION)); From c5c037b874a32519c79490e7d1f75ad8e832dd52 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 18 Oct 2023 08:47:03 +0300 Subject: [PATCH 4/5] Update AlarmControllerTest with latest business logic - no edge notification during alarm udpate --- .../thingsboard/server/controller/AlarmControllerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java index 0e6eb16713..2f859055bd 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java @@ -136,7 +136,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Assert.assertEquals(AlarmSeverity.MAJOR, updatedAlarm.getSeverity()); AlarmInfo foundAlarm = doGet("/api/alarm/info/" + updatedAlarm.getId(), AlarmInfo.class); - testNotifyEntityAllOneTime(foundAlarm, updatedAlarm.getId(), updatedAlarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(foundAlarm, updatedAlarm.getId(), updatedAlarm.getOriginator(), tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, ActionType.UPDATED); } @@ -153,7 +153,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Assert.assertEquals(AlarmSeverity.MAJOR, updatedAlarm.getSeverity()); AlarmInfo foundAlarm = doGet("/api/alarm/info/" + updatedAlarm.getId(), AlarmInfo.class); - testNotifyEntityAllOneTime(foundAlarm, foundAlarm.getId(), foundAlarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(foundAlarm, foundAlarm.getId(), foundAlarm.getOriginator(), tenantId, customerId, tenantAdminUserId, TENANT_ADMIN_EMAIL, ActionType.UPDATED); alarm = updatedAlarm; From 9b6b161d37e79506f9895c53f4abd58d5dfdeda5 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 18 Oct 2023 10:15:51 +0300 Subject: [PATCH 5/5] Revert changes for getting alarms by ids --- ui-ngx/src/app/core/http/entity.service.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ui-ngx/src/app/core/http/entity.service.ts b/ui-ngx/src/app/core/http/entity.service.ts index cc1dd13345..b4eab83b08 100644 --- a/ui-ngx/src/app/core/http/entity.service.ts +++ b/ui-ngx/src/app/core/http/entity.service.ts @@ -240,9 +240,7 @@ export class EntityService { entityIds); break; case EntityType.ALARM: - observable = this.getEntitiesByIdsObservable( - (id) => this.alarmService.getAlarm(id, config), - entityIds); + console.error('Get Alarm Entity is not implemented!'); break; case EntityType.DEVICE_PROFILE: observable = this.getEntitiesByIdsObservable(