Merge branch 'develop/3.5' into improvements/notification-system

This commit is contained in:
ViacheslavKlimov 2023-04-06 18:46:06 +03:00
commit 20ec28b0c3
7 changed files with 118 additions and 53 deletions

View File

@ -423,6 +423,7 @@ public final class EdgeGrpcSession implements Closeable {
stopCurrentSendDownlinkMsgsTask(null);
}
} catch (Exception e) {
log.warn("[{}] Failed to send downlink msgs. Error msg {}", this.sessionId, e.getMessage(), e);
stopCurrentSendDownlinkMsgsTask(e);
}
};
@ -669,19 +670,18 @@ public final class EdgeGrpcSession implements Closeable {
}
private void interruptPreviousSendDownlinkMsgsTask() {
String msg = String.format("[%s] Previous send downlink future was not properly completed, stopping it now!", this.sessionId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
log.info("[{}]Previous send downlink future was not properly completed, stopping it now!", this.sessionId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException());
}
private void interruptGeneralProcessingOnSync(TenantId tenantId, EdgeId edgeId) {
String msg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
log.info("[{}][{}][{}] Sync process started. General processing interrupted!", this.sessionId, tenantId, edgeId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException());
}
public void stopCurrentSendDownlinkMsgsTask(Exception e) {
if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) {
if (e != null) {
log.warn(e.getMessage(), e);
sessionState.getSendDownlinkMsgsFuture().setException(e);
} else {
sessionState.getSendDownlinkMsgsFuture().set(null);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc.processor.telemetry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
@ -283,26 +284,31 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg,
String entityType) {
SettableFuture<Void> futureToSet = SettableFuture.create();
String scope = attributeDeleteMsg.getScope();
List<String> attributeKeys = attributeDeleteMsg.getAttributeNamesList();
attributesService.removeAll(tenantId, entityId, scope, attributeKeys);
if (EntityType.DEVICE.name().equals(entityType)) {
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
tenantId, (DeviceId) entityId, scope, attributeKeys), new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
futureToSet.set(null);
}
ListenableFuture<List<String>> removeAllFuture = attributesService.removeAll(tenantId, entityId, scope, attributeKeys);
return Futures.transformAsync(removeAllFuture, removeAttributes -> {
if (EntityType.DEVICE.name().equals(entityType)) {
SettableFuture<Void> futureToSet = SettableFuture.create();
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
tenantId, (DeviceId) entityId, scope, attributeKeys), new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, t);
futureToSet.setException(t);
}
});
}
return futureToSet;
@Override
public void onFailure(Throwable t) {
log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, t);
futureToSet.setException(t);
}
});
return futureToSet;
} else {
return Futures.immediateFuture(null);
}
}, dbCallbackExecutorService);
}
public EntityDataProto convertTelemetryEventToEntityDataProto(EntityType entityType,

View File

@ -15,11 +15,17 @@
*/
package org.thingsboard.server.edge;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.AbstractMessage;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
@ -27,9 +33,11 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.List;
import java.util.concurrent.TimeUnit;
abstract public class BaseTelemetryEdgeTest extends AbstractEdgeTest {
@ -233,4 +241,50 @@ abstract public class BaseTelemetryEdgeTest extends AbstractEdgeTest {
Assert.assertEquals("value1", keyValueProto.getStringV());
}
@Test
public void testSendAttributesDeleteRequestToCloud_nonDeviceEntity() throws Exception {
edgeImitator.expectMessageAmount(2);
Asset savedAsset = saveAsset("Delete Attribute Test");
doPost("/api/edge/" + edge.getUuidId() + "/asset/" + savedAsset.getUuidId(), Asset.class);
Assert.assertTrue(edgeImitator.waitForMessages());
final String attributeKey = "key1";
ObjectNode attributesData = JacksonUtil.OBJECT_MAPPER.createObjectNode();
attributesData.put(attributeKey, "value1");
doPost("/api/plugins/telemetry/ASSET/" + savedAsset.getId() + "/attributes/" + DataConstants.SERVER_SCOPE, attributesData);
// Wait before device attributes saved to database before deleting them
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
String urlTemplate = "/api/plugins/telemetry/ASSET/" + savedAsset.getId() + "/keys/attributes/" + DataConstants.SERVER_SCOPE;
List<String> actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {});
return actualKeys != null && !actualKeys.isEmpty() && actualKeys.contains(attributeKey);
});
EntityDataProto.Builder builder = EntityDataProto.newBuilder()
.setEntityIdMSB(savedAsset.getUuidId().getMostSignificantBits())
.setEntityIdLSB(savedAsset.getUuidId().getLeastSignificantBits())
.setEntityType(savedAsset.getId().getEntityType().name());
AttributeDeleteMsg.Builder attributeDeleteMsg = AttributeDeleteMsg.newBuilder();
attributeDeleteMsg.setScope(DataConstants.SERVER_SCOPE);
attributeDeleteMsg.addAllAttributeNames(List.of(attributeKey));
attributeDeleteMsg.build();
builder.setAttributeDeleteMsg(attributeDeleteMsg);
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
uplinkMsgBuilder.addEntityData(builder.build());
edgeImitator.expectResponsesAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
String urlTemplate = "/api/plugins/telemetry/ASSET/" + savedAsset.getId() + "/keys/attributes/" + DataConstants.SERVER_SCOPE;
List<String> actualKeys = doGetAsyncTyped(urlTemplate, new TypeReference<>() {});
return actualKeys != null && actualKeys.isEmpty();
});
}
}

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.service.Validator;
public class AttributeUtils {
public static void validate(EntityId id, String scope) {
Validator.validateId(id.getId(), "Incorrect id " + id);
Validator.validateString(scope, "Incorrect scope " + scope);

View File

@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -123,6 +124,7 @@ public class CachedAttributesService implements AttributesService {
return result;
} catch (Throwable e) {
cacheTransaction.rollback();
log.debug("Could not find attribute from cache: [{}] [{}] [{}]", entityId, scope, attributeKey, e);
throw e;
}
});
@ -132,6 +134,7 @@ public class CachedAttributesService implements AttributesService {
@Override
public ListenableFuture<List<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, Collection<String> attributeKeys) {
validate(entityId, scope);
attributeKeys = new LinkedHashSet<>(attributeKeys); // deduplicate the attributes
attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
Map<String, TbCacheValueWrapper<AttributeKvEntry>> wrappedCachedAttributes = findCachedAttributes(entityId, scope, attributeKeys);
@ -170,6 +173,7 @@ public class CachedAttributesService implements AttributesService {
return mergedAttributes;
} catch (Throwable e) {
cacheTransaction.rollback();
log.debug("Could not find attributes from cache: [{}] [{}] [{}]", entityId, scope, notFoundAttributeKeys, e);
throw e;
}
});

View File

@ -56,6 +56,7 @@ public class TbRuleChainInputNode implements TbNode {
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = TbNodeUtils.convert(configuration, TbRuleChainInputNodeConfiguration.class);
this.ruleChainId = new RuleChainId(UUID.fromString(config.getRuleChainId()));
ctx.checkTenantEntity(ruleChainId);
}
@Override

View File

@ -16,7 +16,7 @@
import { BehaviorSubject, ReplaySubject } from 'rxjs';
import { CmdUpdate, CmdUpdateMsg, CmdUpdateType, WebsocketCmd } from '@shared/models/telemetry/telemetry.models';
import { first, map } from 'rxjs/operators';
import { map } from 'rxjs/operators';
import { NgZone } from '@angular/core';
import { isDefinedAndNotNull } from '@core/utils';
import { Notification } from '@shared/models/notification.models';
@ -47,12 +47,19 @@ export class NotificationsUpdate extends CmdUpdate {
export class NotificationSubscriber extends WsSubscriber {
private notificationCountSubject = new ReplaySubject<NotificationCountUpdate>(1);
private notificationsSubject = new BehaviorSubject<NotificationsUpdate>(null);
private notificationsSubject = new BehaviorSubject<NotificationsUpdate>({
cmdId: 0,
cmdUpdateType: undefined,
errorCode: 0,
errorMsg: '',
notifications: [],
totalUnreadCount: 0
});
public messageLimit = 10;
public notificationCount$ = this.notificationCountSubject.asObservable().pipe(map(msg => msg.totalUnreadCount));
public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg?.notifications || []));
public notifications$ = this.notificationsSubject.asObservable().pipe(map(msg => msg.notifications ));
public static createNotificationCountSubscription(notificationWsService: NotificationWebsocketService,
zone: NgZone): NotificationSubscriber {
@ -109,35 +116,27 @@ export class NotificationSubscriber extends WsSubscriber {
}
onNotificationsUpdate(message: NotificationsUpdate) {
this.notificationsSubject.asObservable().pipe(
first()
).subscribe((value) => {
let saveMessage;
if (isDefinedAndNotNull(value) && message.update) {
const findIndex = value.notifications.findIndex(item => item.id.id === message.update.id.id);
if (findIndex !== -1) {
value.notifications.push(message.update);
value.notifications.sort((a, b) => b.createdTime - a.createdTime);
if (value.notifications.length > this.messageLimit) {
value.notifications.pop();
}
const currentNotifications = this.notificationsSubject.value;
let processMessage = message;
if (isDefinedAndNotNull(currentNotifications) && message.update) {
currentNotifications.notifications.unshift(message.update);
if (currentNotifications.notifications.length > this.messageLimit) {
currentNotifications.notifications.pop();
}
processMessage = currentNotifications;
processMessage.totalUnreadCount = message.totalUnreadCount;
}
if (this.zone) {
this.zone.run(
() => {
this.notificationsSubject.next(processMessage);
this.notificationCountSubject.next(processMessage);
}
saveMessage = value;
} else {
saveMessage = message;
}
if (this.zone) {
this.zone.run(
() => {
this.notificationsSubject.next(saveMessage);
this.notificationCountSubject.next(saveMessage);
}
);
} else {
this.notificationsSubject.next(saveMessage);
this.notificationCountSubject.next(saveMessage);
}
});
);
} else {
this.notificationsSubject.next(processMessage);
this.notificationCountSubject.next(processMessage);
}
}
}