Merge pull request #9425 from volodymyr-babak/hotfix/edge-remove-redundant-edge-event-save

Optimizations for Edge Queue Event Storage and Server Load Reduction
This commit is contained in:
Andrew Shvayka 2023-10-24 13:12:17 +03:00 committed by GitHub
commit 464eef2315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 152 additions and 79 deletions

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.server.service.edge; package org.thingsboard.server.service.edge;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -23,6 +25,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.OtaPackageInfo;
import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.edge.EdgeEventType;
@ -36,6 +39,7 @@ import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent;
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.RelationActionEvent; import org.thingsboard.server.dao.eventsourcing.RelationActionEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.user.UserServiceImpl;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -75,7 +79,7 @@ public class EdgeEventSourcingListener {
return; return;
} }
try { try {
if (!isValidEdgeEventEntity(event.getEntity())) { if (!isValidSaveEntityEventForEdgeProcessing(event.getEntity(), event.getOldEntity())) {
return; return;
} }
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
@ -83,7 +87,7 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
null, null, action); null, null, action);
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e);
} }
} }
@ -97,7 +101,7 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED); JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED);
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e);
} }
} }
@ -111,7 +115,7 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
event.getBody(), null, edgeTypeByActionType(event.getActionType())); event.getBody(), null, edgeTypeByActionType(event.getActionType()));
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e);
} }
} }
@ -134,11 +138,11 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null,
JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType())); JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()));
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event); log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e);
} }
} }
private boolean isValidEdgeEventEntity(Object entity) { private boolean isValidSaveEntityEventForEdgeProcessing(Object entity, Object oldEntity) {
if (entity instanceof OtaPackageInfo) { if (entity instanceof OtaPackageInfo) {
OtaPackageInfo otaPackageInfo = (OtaPackageInfo) entity; OtaPackageInfo otaPackageInfo = (OtaPackageInfo) entity;
return otaPackageInfo.hasUrl() || otaPackageInfo.isHasData(); return otaPackageInfo.hasUrl() || otaPackageInfo.isHasData();
@ -147,12 +151,36 @@ public class EdgeEventSourcingListener {
return RuleChainType.EDGE.equals(ruleChain.getType()); return RuleChainType.EDGE.equals(ruleChain.getType());
} else if (entity instanceof User) { } else if (entity instanceof User) {
User user = (User) entity; User user = (User) entity;
return !Authority.SYS_ADMIN.equals(user.getAuthority()); if (Authority.SYS_ADMIN.equals(user.getAuthority())) {
} else if (entity instanceof AlarmApiCallResult) { return false;
AlarmApiCallResult alarmApiCallResult = (AlarmApiCallResult) entity; }
return alarmApiCallResult.isModified(); if (oldEntity != null) {
User oldUser = (User) oldEntity;
cleanUpUserAdditionalInfo(oldUser);
cleanUpUserAdditionalInfo(user);
return !user.equals(oldUser);
}
} else if (entity instanceof AlarmApiCallResult || entity instanceof Alarm) {
return false;
} }
// Default: If the entity doesn't match any of the conditions, consider it as valid. // Default: If the entity doesn't match any of the conditions, consider it as valid.
return true; return true;
} }
private void cleanUpUserAdditionalInfo(User user) {
// reset FAILED_LOGIN_ATTEMPTS and LAST_LOGIN_TS - edge is not interested in this information
if (user.getAdditionalInfo() instanceof NullNode) {
user.setAdditionalInfo(null);
}
if (user.getAdditionalInfo() instanceof ObjectNode) {
ObjectNode additionalInfo = ((ObjectNode) user.getAdditionalInfo());
additionalInfo.remove(UserServiceImpl.FAILED_LOGIN_ATTEMPTS);
additionalInfo.remove(UserServiceImpl.LAST_LOGIN_TS);
if (additionalInfo.isEmpty()) {
user.setAdditionalInfo(null);
} else {
user.setAdditionalInfo(additionalInfo);
}
}
}
} }

View File

@ -24,6 +24,7 @@ import org.springframework.context.annotation.Lazy;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
@ -46,6 +47,7 @@ import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
@ -110,11 +112,13 @@ import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -305,6 +309,61 @@ public abstract class BaseEdgeProcessor {
EdgeEventActionType action, EdgeEventActionType action,
EntityId entityId, EntityId entityId,
JsonNode body) { JsonNode body) {
ListenableFuture<Optional<AttributeKvEntry>> future =
attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE);
return Futures.transformAsync(future, activeOpt -> {
if (activeOpt.isEmpty()) {
log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
return Futures.immediateFuture(null);
}
if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
if (doSaveIfEdgeIsOffline(type, action)) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
return Futures.immediateFuture(null);
}
}
}, dbCallbackExecutorService);
}
private boolean doSaveIfEdgeIsOffline(EdgeEventType type,
EdgeEventActionType action) {
switch (action) {
case TIMESERIES_UPDATED:
case ALARM_ACK:
case ALARM_CLEAR:
case ALARM_ASSIGNED:
case ALARM_UNASSIGNED:
case CREDENTIALS_REQUEST:
return true;
}
switch (type) {
case ALARM:
case RULE_CHAIN:
case RULE_CHAIN_METADATA:
case USER:
case CUSTOMER:
case TENANT:
case TENANT_PROFILE:
case WIDGETS_BUNDLE:
case WIDGET_TYPE:
case ADMIN_SETTINGS:
case OTA_PACKAGE:
case QUEUE:
case RELATION:
return true;
}
return false;
}
private ListenableFuture<Void> doSaveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, EntityId entityId, JsonNode body) {
log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type[{}], " + log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]", "action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body); tenantId, edgeId, type, action, entityId, body);

View File

@ -136,7 +136,7 @@ public class AlarmControllerTest extends AbstractControllerTest {
Assert.assertEquals(AlarmSeverity.MAJOR, updatedAlarm.getSeverity()); Assert.assertEquals(AlarmSeverity.MAJOR, updatedAlarm.getSeverity());
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + updatedAlarm.getId(), AlarmInfo.class); AlarmInfo foundAlarm = doGet("/api/alarm/info/" + updatedAlarm.getId(), AlarmInfo.class);
testNotifyEntityAllOneTime(foundAlarm, updatedAlarm.getId(), updatedAlarm.getOriginator(), testNotifyEntityOneTimeMsgToEdgeServiceNever(foundAlarm, updatedAlarm.getId(), updatedAlarm.getOriginator(),
tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, ActionType.UPDATED); tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, ActionType.UPDATED);
} }
@ -153,7 +153,7 @@ public class AlarmControllerTest extends AbstractControllerTest {
Assert.assertEquals(AlarmSeverity.MAJOR, updatedAlarm.getSeverity()); Assert.assertEquals(AlarmSeverity.MAJOR, updatedAlarm.getSeverity());
AlarmInfo foundAlarm = doGet("/api/alarm/info/" + updatedAlarm.getId(), AlarmInfo.class); AlarmInfo foundAlarm = doGet("/api/alarm/info/" + updatedAlarm.getId(), AlarmInfo.class);
testNotifyEntityAllOneTime(foundAlarm, foundAlarm.getId(), foundAlarm.getOriginator(), testNotifyEntityOneTimeMsgToEdgeServiceNever(foundAlarm, foundAlarm.getId(), foundAlarm.getOriginator(),
tenantId, customerId, tenantAdminUserId, TENANT_ADMIN_EMAIL, ActionType.UPDATED); tenantId, customerId, tenantAdminUserId, TENANT_ADMIN_EMAIL, ActionType.UPDATED);
alarm = updatedAlarm; alarm = updatedAlarm;

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.controller;
import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
@ -34,8 +35,10 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
@ -852,6 +855,11 @@ public class EdgeControllerTest extends AbstractControllerTest {
Edge edge = doPost("/api/edge", constructEdge("Test Sync Edge", "test"), Edge.class); Edge edge = doPost("/api/edge", constructEdge("Test Sync Edge", "test"), Edge.class);
// simulate edge activation
ObjectNode attributes = JacksonUtil.newObjectNode();
attributes.put("active", true);
doPost("/api/plugins/telemetry/EDGE/" + edge.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributes);
doPost("/api/edge/" + edge.getId().getId().toString() doPost("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDevice.getId().getId().toString(), Device.class); + "/device/" + savedDevice.getId().getId().toString(), Device.class);
doPost("/api/edge/" + edge.getId().getId().toString() doPost("/api/edge/" + edge.getId().getId().toString()
@ -860,13 +868,12 @@ public class EdgeControllerTest extends AbstractControllerTest {
EdgeImitator edgeImitator = new EdgeImitator(EDGE_HOST, EDGE_PORT, edge.getRoutingKey(), edge.getSecret()); EdgeImitator edgeImitator = new EdgeImitator(EDGE_HOST, EDGE_PORT, edge.getRoutingKey(), edge.getSecret());
edgeImitator.ignoreType(UserCredentialsUpdateMsg.class); edgeImitator.ignoreType(UserCredentialsUpdateMsg.class);
edgeImitator.expectMessageAmount(25); edgeImitator.expectMessageAmount(24);
edgeImitator.connect(); edgeImitator.connect();
assertThat(edgeImitator.waitForMessages()).as("await for messages on first connect").isTrue(); assertThat(edgeImitator.waitForMessages()).as("await for messages on first connect").isTrue();
verifyFetchersMsgs(edgeImitator); verifyFetchersMsgs(edgeImitator);
// verify queue msgs // verify queue msgs
Assert.assertTrue(popRuleChainMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, "Edge Root Rule Chain"));
Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default"));
Assert.assertTrue(popDeviceMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Test Sync Edge Device 1")); Assert.assertTrue(popDeviceMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "Test Sync Edge Device 1"));
Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "test")); Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "test"));

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.controller; package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.After; import org.junit.After;
@ -26,6 +27,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
@ -86,6 +89,11 @@ public class EdgeEventControllerTest extends AbstractControllerTest {
Edge edge = constructEdge("TestEdge", "default"); Edge edge = constructEdge("TestEdge", "default");
edge = doPost("/api/edge", edge, Edge.class); edge = doPost("/api/edge", edge, Edge.class);
// simulate edge activation
ObjectNode attributes = JacksonUtil.newObjectNode();
attributes.put("active", true);
doPost("/api/plugins/telemetry/EDGE/" + edge.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributes);
Device device = constructDevice("TestDevice", "default"); Device device = constructDevice("TestDevice", "default");
Device savedDevice = doPost("/api/device", device, Device.class); Device savedDevice = doPost("/api/device", device, Device.class);
@ -99,14 +107,13 @@ public class EdgeEventControllerTest extends AbstractControllerTest {
EntityRelation relation = new EntityRelation(savedAsset.getId(), savedDevice.getId(), EntityRelation.CONTAINS_TYPE); EntityRelation relation = new EntityRelation(savedAsset.getId(), savedDevice.getId(), EntityRelation.CONTAINS_TYPE);
awaitForNumberOfEdgeEvents(edgeId, 3); awaitForNumberOfEdgeEvents(edgeId, 2);
doPost("/api/relation", relation); doPost("/api/relation", relation);
awaitForNumberOfEdgeEvents(edgeId, 4); awaitForNumberOfEdgeEvents(edgeId, 3);
List<EdgeEvent> edgeEvents = findEdgeEvents(edgeId); List<EdgeEvent> edgeEvents = findEdgeEvents(edgeId);
Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.RULE_CHAIN)); // root rule chain
Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.DEVICE)); // TestDevice Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.DEVICE)); // TestDevice
Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.ASSET)); // TestAsset Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.ASSET)); // TestAsset
Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.RELATION)); Assert.assertTrue(popEdgeEvent(edgeEvents, EdgeEventType.RELATION));

View File

@ -130,7 +130,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
installation(); installation();
edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret()); edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
edgeImitator.expectMessageAmount(26); edgeImitator.expectMessageAmount(21);
edgeImitator.connect(); edgeImitator.connect();
requestEdgeRuleChainMetadata(); requestEdgeRuleChainMetadata();
@ -163,9 +163,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
@After @After
public void teardownEdgeTest() { public void teardownEdgeTest() {
try { try {
edgeImitator.expectMessageAmount(2);
loginTenantAdmin(); loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
doDelete("/api/edge/" + edge.getId().toString()) doDelete("/api/edge/" + edge.getId().toString())
.andExpect(status().isOk()); .andExpect(status().isOk());
@ -228,33 +226,33 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
// 1 message from queue fetcher // 1 message from queue fetcher
validateQueues(); validateQueues();
// 2 messages - 1 from rule chain fetcher and 1 from rule chain controller // 1 from rule chain fetcher
UUID ruleChainUUID = validateRuleChains(); UUID ruleChainUUID = validateRuleChains();
// 1 from request message // 1 from request message
validateRuleChainMetadataUpdates(ruleChainUUID); validateRuleChainMetadataUpdates(ruleChainUUID);
// 4 messages - 4 messages from fetcher - 2 from system level ('mail', 'mailTemplates') and 2 from admin level ('mail', 'mailTemplates') // 4 messages
// - 2 from fetcher - system level ('mail', 'mailTemplates')
// - 2 from fetcher - admin level ('mail', 'mailTemplates')
validateAdminSettings(); validateAdminSettings();
// 5 messages
// - 1 from default profile fetcher
// - 2 from device profile fetcher (default and thermostat)
// - 1 from device fetcher
// - 1 from device controller (thermostat)
validateDeviceProfiles();
// 4 messages // 4 messages
// - 1 from default profile fetcher // - 1 from default profile fetcher
// - 2 from device profile fetcher (default and thermostat)
// - 1 from device fetcher
validateDeviceProfiles();
// 3 messages
// - 1 from default profile fetcher
// - 1 message from asset profile fetcher // - 1 message from asset profile fetcher
// - 1 message from asset fetcher // - 1 message from asset fetcher
// - 1 message from asset controller
validateAssetProfiles(); validateAssetProfiles();
// 2 messages - 1 from device fetcher and 1 from device controller // 1 from device fetcher
validateDevices(); validateDevices();
// 2 messages - 1 from asset fetcher and 1 from asset controller // 1 from asset fetcher
validateAssets(); validateAssets();
// 1 message from public customer fetcher // 1 message from public customer fetcher
@ -308,8 +306,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
// default msg device profile from fetcher // default msg device profile from fetcher
// thermostat msg from device profile fetcher // thermostat msg from device profile fetcher
// thermostat msg from device fetcher // thermostat msg from device fetcher
// thermostat msg from creation of device Assert.assertEquals(4, deviceProfileUpdateMsgList.size());
Assert.assertEquals(5, deviceProfileUpdateMsgList.size());
Optional<DeviceProfileUpdateMsg> thermostatProfileUpdateMsgOpt = Optional<DeviceProfileUpdateMsg> thermostatProfileUpdateMsgOpt =
deviceProfileUpdateMsgList.stream().filter(dfum -> THERMOSTAT_DEVICE_PROFILE_NAME.equals(dfum.getName())).findAny(); deviceProfileUpdateMsgList.stream().filter(dfum -> THERMOSTAT_DEVICE_PROFILE_NAME.equals(dfum.getName())).findAny();
Assert.assertTrue(thermostatProfileUpdateMsgOpt.isPresent()); Assert.assertTrue(thermostatProfileUpdateMsgOpt.isPresent());
@ -326,10 +323,9 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
} }
private void validateDevices() throws Exception { private void validateDevices() throws Exception {
List<DeviceUpdateMsg> deviceUpdateMsgs = edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class); Optional<DeviceUpdateMsg> deviceUpdateMsgOpt = edgeImitator.findMessageByType(DeviceUpdateMsg.class);
Assert.assertEquals(2, deviceUpdateMsgs.size()); Assert.assertTrue(deviceUpdateMsgOpt.isPresent());
validateDevice(deviceUpdateMsgs.get(0)); validateDevice(deviceUpdateMsgOpt.get());
validateDevice(deviceUpdateMsgs.get(1));
} }
private void validateDevice(DeviceUpdateMsg deviceUpdateMsg) throws Exception { private void validateDevice(DeviceUpdateMsg deviceUpdateMsg) throws Exception {
@ -345,10 +341,9 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
} }
private void validateAssets() throws Exception { private void validateAssets() throws Exception {
List<AssetUpdateMsg> assetUpdateMsgs = edgeImitator.findAllMessagesByType(AssetUpdateMsg.class); Optional<AssetUpdateMsg> assetUpdateMsgOpt = edgeImitator.findMessageByType(AssetUpdateMsg.class);
Assert.assertEquals(2, assetUpdateMsgs.size()); Assert.assertTrue(assetUpdateMsgOpt.isPresent());
validateAsset(assetUpdateMsgs.get(0)); validateAsset(assetUpdateMsgOpt.get());
validateAsset(assetUpdateMsgs.get(1));
} }
private void validateAsset(AssetUpdateMsg assetUpdateMsg) throws Exception { private void validateAsset(AssetUpdateMsg assetUpdateMsg) throws Exception {
@ -365,12 +360,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
} }
private UUID validateRuleChains() throws Exception { private UUID validateRuleChains() throws Exception {
List<RuleChainUpdateMsg> ruleChainUpdateMsgs = edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class); Optional<RuleChainUpdateMsg> ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class);
Assert.assertEquals(2, ruleChainUpdateMsgs.size()); Assert.assertTrue(ruleChainUpdateMsgOpt.isPresent());
RuleChainUpdateMsg ruleChainCreateMsg = ruleChainUpdateMsgs.get(0); RuleChainUpdateMsg ruleChainUpdateMsg = ruleChainUpdateMsgOpt.get();
RuleChainUpdateMsg ruleChainUpdateMsg = ruleChainUpdateMsgs.get(1); validateRuleChain(ruleChainUpdateMsg, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
validateRuleChain(ruleChainCreateMsg, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
validateRuleChain(ruleChainUpdateMsg, UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
return new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB()); return new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB());
} }
@ -429,7 +422,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
private void validateAssetProfiles() throws Exception { private void validateAssetProfiles() throws Exception {
List<AssetProfileUpdateMsg> assetProfileUpdateMsgs = edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class); List<AssetProfileUpdateMsg> assetProfileUpdateMsgs = edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class);
Assert.assertEquals(4, assetProfileUpdateMsgs.size()); Assert.assertEquals(3, assetProfileUpdateMsgs.size());
AssetProfileUpdateMsg assetProfileUpdateMsg = assetProfileUpdateMsgs.get(0); AssetProfileUpdateMsg assetProfileUpdateMsg = assetProfileUpdateMsgs.get(0);
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, assetProfileUpdateMsg.getMsgType()); Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, assetProfileUpdateMsg.getMsgType());
UUID assetProfileUUID = new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB()); UUID assetProfileUUID = new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB());

View File

@ -19,7 +19,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.protobuf.AbstractMessage; import com.google.protobuf.AbstractMessage;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
@ -102,20 +101,6 @@ public class AlarmEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedAlarm.getStatus().name(), alarmUpdateMsg.getStatus()); Assert.assertEquals(savedAlarm.getStatus().name(), alarmUpdateMsg.getStatus());
Assert.assertEquals(savedAlarm.getSeverity().name(), alarmUpdateMsg.getSeverity()); Assert.assertEquals(savedAlarm.getSeverity().name(), alarmUpdateMsg.getSeverity());
// update alarm
String updatedDetails = "{\"testKey\":\"testValue\"}";
savedAlarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(updatedDetails));
edgeImitator.expectMessageAmount(1);
savedAlarm = doPost("/api/alarm", savedAlarm, Alarm.class);
Assert.assertTrue(edgeImitator.waitForMessages());
latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg);
alarmUpdateMsg = (AlarmUpdateMsg) latestMessage;
Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, alarmUpdateMsg.getMsgType());
Assert.assertEquals(savedAlarm.getUuidId().getMostSignificantBits(), alarmUpdateMsg.getIdMSB());
Assert.assertEquals(savedAlarm.getUuidId().getLeastSignificantBits(), alarmUpdateMsg.getIdLSB());
Assert.assertEquals(updatedDetails, alarmUpdateMsg.getDetails());
// ack alarm // ack alarm
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack"); doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack");

View File

@ -274,9 +274,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
tenantProfile.getProfileData().setConfiguration(profileConfiguration); tenantProfile.getProfileData().setConfiguration(profileConfiguration);
doPost("/api/tenantProfile/", tenantProfile, TenantProfile.class); doPost("/api/tenantProfile/", tenantProfile, TenantProfile.class);
edgeImitator.expectMessageAmount(2);
loginTenantAdmin(); loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
UUID uuid = Uuids.timeBased(); UUID uuid = Uuids.timeBased();

View File

@ -79,9 +79,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedTenantAdmin.getLastName(), userUpdateMsg.getLastName()); Assert.assertEquals(savedTenantAdmin.getLastName(), userUpdateMsg.getLastName());
// update user credentials // update user credentials
edgeImitator.expectMessageAmount(2);
login(savedTenantAdmin.getEmail(), "tenant"); login(savedTenantAdmin.getEmail(), "tenant");
Assert.assertTrue(edgeImitator.waitForMessages());
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest(); ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest();
@ -96,9 +94,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedTenantAdmin.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB()); Assert.assertEquals(savedTenantAdmin.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB());
Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword())); Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword()));
edgeImitator.expectMessageAmount(2);
loginTenantAdmin(); loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
// delete user // delete user
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
@ -164,9 +160,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedCustomerUser.getLastName(), userUpdateMsg.getLastName()); Assert.assertEquals(savedCustomerUser.getLastName(), userUpdateMsg.getLastName());
// update user credentials // update user credentials
edgeImitator.expectMessageAmount(2);
login(savedCustomerUser.getEmail(), "customer"); login(savedCustomerUser.getEmail(), "customer");
Assert.assertTrue(edgeImitator.waitForMessages());
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest(); ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest();
@ -181,9 +175,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedCustomerUser.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB()); Assert.assertEquals(savedCustomerUser.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB());
Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword())); Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword()));
edgeImitator.expectMessageAmount(2);
loginTenantAdmin(); loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
// delete user // delete user
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
public class SaveEntityEvent<T> { public class SaveEntityEvent<T> {
private final TenantId tenantId; private final TenantId tenantId;
private final T entity; private final T entity;
private final T oldEntity;
private final EntityId entityId; private final EntityId entityId;
private final Boolean added; private final Boolean added;
} }

View File

@ -70,8 +70,8 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
public static final String USER_PASSWORD_HISTORY = "userPasswordHistory"; public static final String USER_PASSWORD_HISTORY = "userPasswordHistory";
private static final String LAST_LOGIN_TS = "lastLoginTs"; public static final String LAST_LOGIN_TS = "lastLoginTs";
private static final String FAILED_LOGIN_ATTEMPTS = "failedLoginAttempts"; public static final String FAILED_LOGIN_ATTEMPTS = "failedLoginAttempts";
private static final int DEFAULT_TOKEN_LENGTH = 30; private static final int DEFAULT_TOKEN_LENGTH = 30;
public static final String INCORRECT_USER_ID = "Incorrect userId "; public static final String INCORRECT_USER_ID = "Incorrect userId ";
@ -126,7 +126,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
@Override @Override
public User saveUser(TenantId tenantId, User user) { public User saveUser(TenantId tenantId, User user) {
log.trace("Executing saveUser [{}]", user); log.trace("Executing saveUser [{}]", user);
userValidator.validate(user, User::getTenantId); User oldUser = userValidator.validate(user, User::getTenantId);
if (!userLoginCaseSensitive) { if (!userLoginCaseSensitive) {
user.setEmail(user.getEmail().toLowerCase()); user.setEmail(user.getEmail().toLowerCase());
} }
@ -143,6 +143,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
eventPublisher.publishEvent(SaveEntityEvent.builder() eventPublisher.publishEvent(SaveEntityEvent.builder()
.tenantId(tenantId == null ? TenantId.SYS_TENANT_ID : tenantId) .tenantId(tenantId == null ? TenantId.SYS_TENANT_ID : tenantId)
.entity(savedUser) .entity(savedUser)
.oldEntity(oldUser)
.entityId(savedUser.getId()) .entityId(savedUser.getId())
.added(user.getId() == null).build()); .added(user.getId() == null).build());
return savedUser; return savedUser;

View File

@ -92,6 +92,7 @@ import { NotificationService } from '@core/http/notification.service';
import { TenantProfileService } from '@core/http/tenant-profile.service'; import { TenantProfileService } from '@core/http/tenant-profile.service';
import { NotificationType } from '@shared/models/notification.models'; import { NotificationType } from '@shared/models/notification.models';
import { UserId } from '@shared/models/id/user-id'; import { UserId } from '@shared/models/id/user-id';
import { AlarmService } from '@core/http/alarm.service';
@Injectable({ @Injectable({
providedIn: 'root' providedIn: 'root'
@ -119,7 +120,8 @@ export class EntityService {
private assetProfileService: AssetProfileService, private assetProfileService: AssetProfileService,
private utils: UtilsService, private utils: UtilsService,
private queueService: QueueService, private queueService: QueueService,
private notificationService: NotificationService private notificationService: NotificationService,
private alarmService: AlarmService
) { } ) { }
private getEntityObservable(entityType: EntityType, entityId: string, private getEntityObservable(entityType: EntityType, entityId: string,
@ -155,7 +157,7 @@ export class EntityService {
observable = this.ruleChainService.getRuleChain(entityId, config); observable = this.ruleChainService.getRuleChain(entityId, config);
break; break;
case EntityType.ALARM: case EntityType.ALARM:
console.error('Get Alarm Entity is not implemented!'); observable = this.alarmService.getAlarm(entityId, config);
break; break;
case EntityType.OTA_PACKAGE: case EntityType.OTA_PACKAGE:
observable = this.otaPackageService.getOtaPackageInfo(entityId, config); observable = this.otaPackageService.getOtaPackageInfo(entityId, config);