Edge: do not sync User login actions. Edge: do not save particular edge events in case edge is not activated or disconnected

This commit is contained in:
Volodymyr Babak 2023-10-16 18:05:12 +03:00
parent 94e935636e
commit 8c94ec5c52
7 changed files with 123 additions and 52 deletions

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.service.edge;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -36,6 +38,7 @@ import org.thingsboard.server.dao.eventsourcing.ActionEntityEvent;
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.RelationActionEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.user.UserServiceImpl;
import javax.annotation.PostConstruct;
@ -75,7 +78,7 @@ public class EdgeEventSourcingListener {
return;
}
try {
if (!isValidEdgeEventEntity(event.getEntity())) {
if (!isValidEdgeEventEntity(event.getEntity(), event.getOldEntity())) {
return;
}
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
@ -83,7 +86,7 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(),
null, null, action);
} catch (Exception e) {
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event);
log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e);
}
}
@ -97,7 +100,7 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED);
} catch (Exception e) {
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event);
log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e);
}
}
@ -111,7 +114,7 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(),
event.getBody(), null, edgeTypeByActionType(event.getActionType()));
} catch (Exception e) {
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event);
log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e);
}
}
@ -134,11 +137,11 @@ public class EdgeEventSourcingListener {
tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null,
JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()));
} catch (Exception e) {
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event);
log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e);
}
}
private boolean isValidEdgeEventEntity(Object entity) {
private boolean isValidEdgeEventEntity(Object entity, Object oldEntity) {
if (entity instanceof OtaPackageInfo) {
OtaPackageInfo otaPackageInfo = (OtaPackageInfo) entity;
return otaPackageInfo.hasUrl() || otaPackageInfo.isHasData();
@ -147,7 +150,15 @@ public class EdgeEventSourcingListener {
return RuleChainType.EDGE.equals(ruleChain.getType());
} else if (entity instanceof User) {
User user = (User) entity;
return !Authority.SYS_ADMIN.equals(user.getAuthority());
if (Authority.SYS_ADMIN.equals(user.getAuthority())) {
return false;
}
if (oldEntity != null) {
User oldUser = (User) oldEntity;
cleanUpUserAdditionalInfo(oldUser);
cleanUpUserAdditionalInfo(user);
return !user.equals(oldUser);
}
} else if (entity instanceof AlarmApiCallResult) {
AlarmApiCallResult alarmApiCallResult = (AlarmApiCallResult) entity;
return alarmApiCallResult.isModified();
@ -155,4 +166,21 @@ public class EdgeEventSourcingListener {
// Default: If the entity doesn't match any of the conditions, consider it as valid.
return true;
}
private void cleanUpUserAdditionalInfo(User user) {
// reset FAILED_LOGIN_ATTEMPTS and LAST_LOGIN_TS - edge is not interested in this information
if (user.getAdditionalInfo() instanceof NullNode) {
user.setAdditionalInfo(null);
}
if (user.getAdditionalInfo() instanceof ObjectNode) {
ObjectNode additionalInfo = ((ObjectNode) user.getAdditionalInfo());
additionalInfo.remove(UserServiceImpl.FAILED_LOGIN_ATTEMPTS);
additionalInfo.remove(UserServiceImpl.LAST_LOGIN_TS);
if (additionalInfo.isEmpty()) {
user.setAdditionalInfo(null);
} else {
user.setAdditionalInfo(additionalInfo);
}
}
}
}

View File

@ -24,6 +24,7 @@ import org.springframework.context.annotation.Lazy;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils;
@ -46,6 +47,7 @@ import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@ -110,11 +112,13 @@ import org.thingsboard.server.service.entitiy.TbNotificationEntityService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -305,6 +309,60 @@ public abstract class BaseEdgeProcessor {
EdgeEventActionType action,
EntityId entityId,
JsonNode body) {
ListenableFuture<Optional<AttributeKvEntry>> future = attributesService.find(tenantId, edgeId, DataConstants.SERVER_SCOPE, DefaultDeviceStateService.ACTIVITY_STATE);
return Futures.transformAsync(future, activeOpt -> {
if (activeOpt.isEmpty()) {
log.trace("Edge is not activated. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
return Futures.immediateFuture(null);
}
if (activeOpt.get().getBooleanValue().isPresent() && activeOpt.get().getBooleanValue().get()) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
if (doSaveIfEdgeIsOffline(type, action)) {
return doSaveEdgeEvent(tenantId, edgeId, type, action, entityId, body);
} else {
log.trace("Edge is not active at the moment. Skipping event. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
return Futures.immediateFuture(null);
}
}
}, dbCallbackExecutorService);
}
private boolean doSaveIfEdgeIsOffline(EdgeEventType type,
EdgeEventActionType action) {
switch (action) {
case TIMESERIES_UPDATED:
case ALARM_ACK:
case ALARM_CLEAR:
case ALARM_ASSIGNED:
case ALARM_UNASSIGNED:
case CREDENTIALS_REQUEST:
return true;
}
switch (type) {
case ALARM:
case RULE_CHAIN:
case RULE_CHAIN_METADATA:
case USER:
case CUSTOMER:
case TENANT:
case TENANT_PROFILE:
case WIDGETS_BUNDLE:
case WIDGET_TYPE:
case ADMIN_SETTINGS:
case OTA_PACKAGE:
case QUEUE:
case RELATION:
return true;
}
return false;
}
private ListenableFuture<Void> doSaveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, EntityId entityId, JsonNode body) {
log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type[{}], " +
"action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);

View File

@ -130,7 +130,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
installation();
edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
edgeImitator.expectMessageAmount(26);
edgeImitator.expectMessageAmount(21);
edgeImitator.connect();
requestEdgeRuleChainMetadata();
@ -163,9 +163,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
@After
public void teardownEdgeTest() {
try {
edgeImitator.expectMessageAmount(2);
loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
doDelete("/api/edge/" + edge.getId().toString())
.andExpect(status().isOk());
@ -228,33 +226,33 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
// 1 message from queue fetcher
validateQueues();
// 2 messages - 1 from rule chain fetcher and 1 from rule chain controller
// 1 from rule chain fetcher
UUID ruleChainUUID = validateRuleChains();
// 1 from request message
validateRuleChainMetadataUpdates(ruleChainUUID);
// 4 messages - 4 messages from fetcher - 2 from system level ('mail', 'mailTemplates') and 2 from admin level ('mail', 'mailTemplates')
// 4 messages
// - 2 from fetcher - system level ('mail', 'mailTemplates')
// - 2 from fetcher - admin level ('mail', 'mailTemplates')
validateAdminSettings();
// 5 messages
// - 1 from default profile fetcher
// - 2 from device profile fetcher (default and thermostat)
// - 1 from device fetcher
// - 1 from device controller (thermostat)
validateDeviceProfiles();
// 4 messages
// - 1 from default profile fetcher
// - 2 from device profile fetcher (default and thermostat)
// - 1 from device fetcher
validateDeviceProfiles();
// 3 messages
// - 1 from default profile fetcher
// - 1 message from asset profile fetcher
// - 1 message from asset fetcher
// - 1 message from asset controller
validateAssetProfiles();
// 2 messages - 1 from device fetcher and 1 from device controller
// 1 from device fetcher
validateDevices();
// 2 messages - 1 from asset fetcher and 1 from asset controller
// 1 from asset fetcher
validateAssets();
// 1 message from public customer fetcher
@ -308,8 +306,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
// default msg device profile from fetcher
// thermostat msg from device profile fetcher
// thermostat msg from device fetcher
// thermostat msg from creation of device
Assert.assertEquals(5, deviceProfileUpdateMsgList.size());
Assert.assertEquals(4, deviceProfileUpdateMsgList.size());
Optional<DeviceProfileUpdateMsg> thermostatProfileUpdateMsgOpt =
deviceProfileUpdateMsgList.stream().filter(dfum -> THERMOSTAT_DEVICE_PROFILE_NAME.equals(dfum.getName())).findAny();
Assert.assertTrue(thermostatProfileUpdateMsgOpt.isPresent());
@ -326,10 +323,9 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
}
private void validateDevices() throws Exception {
List<DeviceUpdateMsg> deviceUpdateMsgs = edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class);
Assert.assertEquals(2, deviceUpdateMsgs.size());
validateDevice(deviceUpdateMsgs.get(0));
validateDevice(deviceUpdateMsgs.get(1));
Optional<DeviceUpdateMsg> deviceUpdateMsgOpt = edgeImitator.findMessageByType(DeviceUpdateMsg.class);
Assert.assertTrue(deviceUpdateMsgOpt.isPresent());
validateDevice(deviceUpdateMsgOpt.get());
}
private void validateDevice(DeviceUpdateMsg deviceUpdateMsg) throws Exception {
@ -345,10 +341,9 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
}
private void validateAssets() throws Exception {
List<AssetUpdateMsg> assetUpdateMsgs = edgeImitator.findAllMessagesByType(AssetUpdateMsg.class);
Assert.assertEquals(2, assetUpdateMsgs.size());
validateAsset(assetUpdateMsgs.get(0));
validateAsset(assetUpdateMsgs.get(1));
Optional<AssetUpdateMsg> assetUpdateMsgOpt = edgeImitator.findMessageByType(AssetUpdateMsg.class);
Assert.assertTrue(assetUpdateMsgOpt.isPresent());
validateAsset(assetUpdateMsgOpt.get());
}
private void validateAsset(AssetUpdateMsg assetUpdateMsg) throws Exception {
@ -365,12 +360,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
}
private UUID validateRuleChains() throws Exception {
List<RuleChainUpdateMsg> ruleChainUpdateMsgs = edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class);
Assert.assertEquals(2, ruleChainUpdateMsgs.size());
RuleChainUpdateMsg ruleChainCreateMsg = ruleChainUpdateMsgs.get(0);
RuleChainUpdateMsg ruleChainUpdateMsg = ruleChainUpdateMsgs.get(1);
validateRuleChain(ruleChainCreateMsg, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
validateRuleChain(ruleChainUpdateMsg, UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
Optional<RuleChainUpdateMsg> ruleChainUpdateMsgOpt = edgeImitator.findMessageByType(RuleChainUpdateMsg.class);
Assert.assertTrue(ruleChainUpdateMsgOpt.isPresent());
RuleChainUpdateMsg ruleChainUpdateMsg = ruleChainUpdateMsgOpt.get();
validateRuleChain(ruleChainUpdateMsg, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
return new UUID(ruleChainUpdateMsg.getIdMSB(), ruleChainUpdateMsg.getIdLSB());
}
@ -429,7 +422,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
private void validateAssetProfiles() throws Exception {
List<AssetProfileUpdateMsg> assetProfileUpdateMsgs = edgeImitator.findAllMessagesByType(AssetProfileUpdateMsg.class);
Assert.assertEquals(4, assetProfileUpdateMsgs.size());
Assert.assertEquals(3, assetProfileUpdateMsgs.size());
AssetProfileUpdateMsg assetProfileUpdateMsg = assetProfileUpdateMsgs.get(0);
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, assetProfileUpdateMsg.getMsgType());
UUID assetProfileUUID = new UUID(assetProfileUpdateMsg.getIdMSB(), assetProfileUpdateMsg.getIdLSB());

View File

@ -274,9 +274,7 @@ public class DeviceEdgeTest extends AbstractEdgeTest {
tenantProfile.getProfileData().setConfiguration(profileConfiguration);
doPost("/api/tenantProfile/", tenantProfile, TenantProfile.class);
edgeImitator.expectMessageAmount(2);
loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
UUID uuid = Uuids.timeBased();

View File

@ -79,9 +79,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedTenantAdmin.getLastName(), userUpdateMsg.getLastName());
// update user credentials
edgeImitator.expectMessageAmount(2);
login(savedTenantAdmin.getEmail(), "tenant");
Assert.assertTrue(edgeImitator.waitForMessages());
edgeImitator.expectMessageAmount(1);
ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest();
@ -96,9 +94,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedTenantAdmin.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB());
Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword()));
edgeImitator.expectMessageAmount(2);
loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
// delete user
edgeImitator.expectMessageAmount(1);
@ -164,9 +160,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedCustomerUser.getLastName(), userUpdateMsg.getLastName());
// update user credentials
edgeImitator.expectMessageAmount(2);
login(savedCustomerUser.getEmail(), "customer");
Assert.assertTrue(edgeImitator.waitForMessages());
edgeImitator.expectMessageAmount(1);
ChangePasswordRequest changePasswordRequest = new ChangePasswordRequest();
@ -181,9 +175,7 @@ public class UserEdgeTest extends AbstractEdgeTest {
Assert.assertEquals(savedCustomerUser.getUuidId().getLeastSignificantBits(), userCredentialsUpdateMsg.getUserIdLSB());
Assert.assertTrue(passwordEncoder.matches(changePasswordRequest.getNewPassword(), userCredentialsUpdateMsg.getPassword()));
edgeImitator.expectMessageAmount(2);
loginTenantAdmin();
Assert.assertTrue(edgeImitator.waitForMessages());
// delete user
edgeImitator.expectMessageAmount(1);

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId;
public class SaveEntityEvent<T> {
private final TenantId tenantId;
private final T entity;
private final T oldEntity;
private final EntityId entityId;
private final Boolean added;
}

View File

@ -70,8 +70,8 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
public static final String USER_PASSWORD_HISTORY = "userPasswordHistory";
private static final String LAST_LOGIN_TS = "lastLoginTs";
private static final String FAILED_LOGIN_ATTEMPTS = "failedLoginAttempts";
public static final String LAST_LOGIN_TS = "lastLoginTs";
public static final String FAILED_LOGIN_ATTEMPTS = "failedLoginAttempts";
private static final int DEFAULT_TOKEN_LENGTH = 30;
public static final String INCORRECT_USER_ID = "Incorrect userId ";
@ -126,7 +126,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
@Override
public User saveUser(TenantId tenantId, User user) {
log.trace("Executing saveUser [{}]", user);
userValidator.validate(user, User::getTenantId);
User oldUser = userValidator.validate(user, User::getTenantId);
if (!userLoginCaseSensitive) {
user.setEmail(user.getEmail().toLowerCase());
}
@ -143,6 +143,7 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
eventPublisher.publishEvent(SaveEntityEvent.builder()
.tenantId(tenantId == null ? TenantId.SYS_TENANT_ID : tenantId)
.entity(savedUser)
.oldEntity(oldUser)
.entityId(savedUser.getId())
.added(user.getId() == null).build());
return savedUser;