Merge pull request #7684 from ShvaykaD/bugfix/7683
[3.4.2] Missed logic for sending attributes deleted notification to device in the Delete Attributes node
This commit is contained in:
commit
90238ffaa8
@ -661,10 +661,8 @@ public class TelemetryController extends BaseController {
|
|||||||
logAttributesDeleted(user, entityId, scope, keys, null);
|
logAttributesDeleted(user, entityId, scope, keys, null);
|
||||||
if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
|
if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
|
||||||
DeviceId deviceId = new DeviceId(entityId.getId());
|
DeviceId deviceId = new DeviceId(entityId.getId());
|
||||||
Set<AttributeKey> keysToNotify = new HashSet<>();
|
|
||||||
keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key)));
|
|
||||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
|
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
|
||||||
user.getTenantId(), deviceId, keysToNotify), null);
|
user.getTenantId(), deviceId, scope, keys), null);
|
||||||
}
|
}
|
||||||
result.setResult(new ResponseEntity<>(HttpStatus.OK));
|
result.setResult(new ResponseEntity<>(HttpStatus.OK));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -287,12 +287,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
|
List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
|
||||||
attributesService.removeAll(tenantId, entityId, scope, attributeNames);
|
attributesService.removeAll(tenantId, entityId, scope, attributeNames);
|
||||||
if (EntityType.DEVICE.name().equals(entityType)) {
|
if (EntityType.DEVICE.name().equals(entityType)) {
|
||||||
Set<AttributeKey> attributeKeys = new HashSet<>();
|
|
||||||
for (String attributeName : attributeNames) {
|
|
||||||
attributeKeys.add(new AttributeKey(scope, attributeName));
|
|
||||||
}
|
|
||||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
|
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
|
||||||
tenantId, (DeviceId) entityId, attributeKeys), new TbQueueCallback() {
|
tenantId, (DeviceId) entityId, scope, attributeNames), new TbQueueCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||||
futureToSet.set(null);
|
futureToSet.set(null);
|
||||||
|
|||||||
@ -360,9 +360,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
|
|||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Void tmp) {
|
public void onSuccess(@Nullable Void tmp) {
|
||||||
log.trace("[{}] Success remove target {} attributes!", device.getId(), otaPackageType);
|
log.trace("[{}] Success remove target {} attributes!", device.getId(), otaPackageType);
|
||||||
Set<AttributeKey> keysToNotify = new HashSet<>();
|
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), device.getId(), DataConstants.SHARED_SCOPE, attributesKeys), null);
|
||||||
attributesKeys.forEach(key -> keysToNotify.add(new AttributeKey(DataConstants.SHARED_SCOPE, key)));
|
|
||||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), device.getId(), keysToNotify), null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -490,7 +490,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
|||||||
subscriptionManagerService.onAttributesDelete(
|
subscriptionManagerService.onAttributesDelete(
|
||||||
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
||||||
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
||||||
proto.getScope(), proto.getKeysList(), callback);
|
proto.getScope(), proto.getKeysList(), proto.getNotifyDevice(), callback);
|
||||||
} else if (msg.hasTsDelete()) {
|
} else if (msg.hasTsDelete()) {
|
||||||
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
|
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
|
||||||
subscriptionManagerService.onTimeSeriesDelete(
|
subscriptionManagerService.onTimeSeriesDelete(
|
||||||
|
|||||||
@ -326,7 +326,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback callback) {
|
public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, TbCallback callback) {
|
||||||
onLocalTelemetrySubUpdate(entityId,
|
onLocalTelemetrySubUpdate(entityId,
|
||||||
s -> {
|
s -> {
|
||||||
if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
|
if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
|
||||||
@ -349,7 +349,13 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
return subscriptionUpdate;
|
return subscriptionUpdate;
|
||||||
}, false);
|
}, false);
|
||||||
if (entityId.getEntityType() == EntityType.DEVICE) {
|
if (entityId.getEntityType() == EntityType.DEVICE) {
|
||||||
|
if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)
|
||||||
|
|| TbAttributeSubscriptionScope.ANY_SCOPE.name().equalsIgnoreCase(scope)) {
|
||||||
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
||||||
|
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
||||||
|
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(tenantId,
|
||||||
|
new DeviceId(entityId.getId()), scope, keys), null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -38,7 +38,7 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
|
|||||||
|
|
||||||
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, TbCallback callback);
|
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, TbCallback callback);
|
||||||
|
|
||||||
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
|
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, TbCallback empty);
|
||||||
|
|
||||||
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
|
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
|
||||||
|
|
||||||
|
|||||||
@ -236,7 +236,7 @@ public class TbSubscriptionUtils {
|
|||||||
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
|
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ToCoreMsg toAttributesDeleteProto(TenantId tenantId, EntityId entityId, String scope, List<String> keys) {
|
public static ToCoreMsg toAttributesDeleteProto(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice) {
|
||||||
TbAttributeDeleteProto.Builder builder = TbAttributeDeleteProto.newBuilder();
|
TbAttributeDeleteProto.Builder builder = TbAttributeDeleteProto.newBuilder();
|
||||||
builder.setEntityType(entityId.getEntityType().name());
|
builder.setEntityType(entityId.getEntityType().name());
|
||||||
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
||||||
@ -245,6 +245,7 @@ public class TbSubscriptionUtils {
|
|||||||
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||||
builder.setScope(scope);
|
builder.setScope(scope);
|
||||||
builder.addAllKeys(keys);
|
builder.addAllKeys(keys);
|
||||||
|
builder.setNotifyDevice(notifyDevice);
|
||||||
|
|
||||||
SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
|
SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
|
||||||
msgBuilder.setAttrDelete(builder);
|
msgBuilder.setAttrDelete(builder);
|
||||||
|
|||||||
@ -271,14 +271,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
@Override
|
@Override
|
||||||
public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
|
public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
|
||||||
checkInternalEntity(entityId);
|
checkInternalEntity(entityId);
|
||||||
deleteAndNotifyInternal(tenantId, entityId, scope, keys, callback);
|
deleteAndNotifyInternal(tenantId, entityId, scope, keys, false, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
|
public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback) {
|
||||||
|
checkInternalEntity(entityId);
|
||||||
|
deleteAndNotifyInternal(tenantId, entityId, scope, keys, notifyDevice, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback) {
|
||||||
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
|
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
|
||||||
addVoidCallback(deleteFuture, callback);
|
addVoidCallback(deleteFuture, callback);
|
||||||
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys));
|
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -382,16 +388,16 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys) {
|
private void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
||||||
if (currentPartitions.contains(tpi)) {
|
if (currentPartitions.contains(tpi)) {
|
||||||
if (subscriptionManagerService.isPresent()) {
|
if (subscriptionManagerService.isPresent()) {
|
||||||
subscriptionManagerService.get().onAttributesDelete(tenantId, entityId, scope, keys, TbCallback.EMPTY);
|
subscriptionManagerService.get().onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice, TbCallback.EMPTY);
|
||||||
} else {
|
} else {
|
||||||
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
|
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesDeleteProto(tenantId, entityId, scope, keys);
|
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesDeleteProto(tenantId, entityId, scope, keys, notifyDevice);
|
||||||
clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
|
clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -37,7 +37,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
|
|||||||
|
|
||||||
void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
|
void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
|
||||||
|
|
||||||
void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback);
|
void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback);
|
||||||
|
|
||||||
void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
|
void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
|
||||||
|
|
||||||
|
|||||||
@ -600,6 +600,7 @@ message TbAttributeDeleteProto {
|
|||||||
int64 tenantIdLSB = 5;
|
int64 tenantIdLSB = 5;
|
||||||
string scope = 6;
|
string scope = 6;
|
||||||
repeated string keys = 7;
|
repeated string keys = 7;
|
||||||
|
bool notifyDevice = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
message TbTimeSeriesDeleteProto {
|
message TbTimeSeriesDeleteProto {
|
||||||
|
|||||||
@ -28,6 +28,7 @@ public class DataConstants {
|
|||||||
public static final String CLIENT_SCOPE = "CLIENT_SCOPE";
|
public static final String CLIENT_SCOPE = "CLIENT_SCOPE";
|
||||||
public static final String SERVER_SCOPE = "SERVER_SCOPE";
|
public static final String SERVER_SCOPE = "SERVER_SCOPE";
|
||||||
public static final String SHARED_SCOPE = "SHARED_SCOPE";
|
public static final String SHARED_SCOPE = "SHARED_SCOPE";
|
||||||
|
public static final String NOTIFY_DEVICE_METADATA_KEY = "notifyDevice";
|
||||||
public static final String LATEST_TS = "LATEST_TS";
|
public static final String LATEST_TS = "LATEST_TS";
|
||||||
public static final String IS_NEW_ALARM = "isNewAlarm";
|
public static final String IS_NEW_ALARM = "isNewAlarm";
|
||||||
public static final String IS_EXISTING_ALARM = "isExistingAlarm";
|
public static final String IS_EXISTING_ALARM = "isExistingAlarm";
|
||||||
|
|||||||
@ -32,6 +32,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
|||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||||
import org.thingsboard.server.common.data.ApiUsageState;
|
import org.thingsboard.server.common.data.ApiUsageState;
|
||||||
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.Device;
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.DeviceProfile;
|
import org.thingsboard.server.common.data.DeviceProfile;
|
||||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||||
@ -585,7 +586,7 @@ public class DefaultTransportService implements TransportService {
|
|||||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||||
metaData.putValue("deviceName", sessionInfo.getDeviceName());
|
metaData.putValue("deviceName", sessionInfo.getDeviceName());
|
||||||
metaData.putValue("deviceType", sessionInfo.getDeviceType());
|
metaData.putValue("deviceType", sessionInfo.getDeviceType());
|
||||||
metaData.putValue("notifyDevice", "false");
|
metaData.putValue(DataConstants.NOTIFY_DEVICE_METADATA_KEY, "false");
|
||||||
CustomerId customerId = getCustomerId(sessionInfo);
|
CustomerId customerId = getCustomerId(sessionInfo);
|
||||||
sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,
|
sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,
|
||||||
new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
|
new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
|
||||||
|
|||||||
@ -64,6 +64,8 @@ public interface RuleEngineTelemetryService {
|
|||||||
|
|
||||||
void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback);
|
void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback);
|
||||||
|
|
||||||
|
void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback);
|
||||||
|
|
||||||
void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
|
void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
|
||||||
|
|
||||||
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
|
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
|
||||||
|
|||||||
@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
|||||||
import org.thingsboard.server.common.msg.MsgType;
|
import org.thingsboard.server.common.msg.MsgType;
|
||||||
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
|
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
@ -52,8 +53,10 @@ public class DeviceAttributesEventNotificationMsg implements ToDeviceActorNotifi
|
|||||||
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, scope, values, false);
|
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, scope, values, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, Set<AttributeKey> keys) {
|
public static DeviceAttributesEventNotificationMsg onDelete(TenantId tenantId, DeviceId deviceId, String scope, List<String> keys) {
|
||||||
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keys, null, null, true);
|
Set<AttributeKey> keysToNotify = new HashSet<>();
|
||||||
|
keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key)));
|
||||||
|
return new DeviceAttributesEventNotificationMsg(tenantId, deviceId, keysToNotify, null, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -17,7 +17,6 @@ package org.thingsboard.rule.engine.telemetry;
|
|||||||
|
|
||||||
import com.google.gson.JsonParser;
|
import com.google.gson.JsonParser;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
|
||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.rule.engine.api.RuleNode;
|
import org.thingsboard.rule.engine.api.RuleNode;
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
@ -34,6 +33,10 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.CLIENT_SCOPE;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RuleNode(
|
@RuleNode(
|
||||||
type = ComponentType.ACTION,
|
type = ComponentType.ACTION,
|
||||||
@ -52,8 +55,6 @@ public class TbMsgAttributesNode implements TbNode {
|
|||||||
|
|
||||||
private TbMsgAttributesNodeConfiguration config;
|
private TbMsgAttributesNodeConfiguration config;
|
||||||
|
|
||||||
private static final String SCOPE = "scope";
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
this.config = TbNodeUtils.convert(configuration, TbMsgAttributesNodeConfiguration.class);
|
this.config = TbNodeUtils.convert(configuration, TbMsgAttributesNodeConfiguration.class);
|
||||||
@ -74,26 +75,33 @@ public class TbMsgAttributesNode implements TbNode {
|
|||||||
ctx.tellSuccess(msg);
|
ctx.tellSuccess(msg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String scope = msg.getMetaData().getValue(SCOPE);
|
String scope = getScope(msg.getMetaData().getValue(SCOPE));
|
||||||
if (StringUtils.isEmpty(scope)) {
|
|
||||||
scope = config.getScope();
|
|
||||||
}
|
|
||||||
String notifyDeviceStr = msg.getMetaData().getValue("notifyDevice");
|
|
||||||
boolean sendAttributesUpdateNotification = checkSendNotification(scope);
|
boolean sendAttributesUpdateNotification = checkSendNotification(scope);
|
||||||
ctx.getTelemetryService().saveAndNotify(
|
ctx.getTelemetryService().saveAndNotify(
|
||||||
ctx.getTenantId(),
|
ctx.getTenantId(),
|
||||||
msg.getOriginator(),
|
msg.getOriginator(),
|
||||||
scope,
|
scope,
|
||||||
attributes,
|
attributes,
|
||||||
config.getNotifyDevice() || StringUtils.isEmpty(notifyDeviceStr) || Boolean.parseBoolean(notifyDeviceStr),
|
checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY)),
|
||||||
sendAttributesUpdateNotification ?
|
sendAttributesUpdateNotification ?
|
||||||
new AttributesUpdateNodeCallback(ctx, msg, config.getScope(), attributes) :
|
new AttributesUpdateNodeCallback(ctx, msg, scope, attributes) :
|
||||||
new TelemetryNodeCallback(ctx, msg)
|
new TelemetryNodeCallback(ctx, msg)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean checkSendNotification(String scope){
|
private boolean checkSendNotification(String scope) {
|
||||||
return config.isSendAttributesUpdatedNotification() && !DataConstants.CLIENT_SCOPE.equals(scope);
|
return config.isSendAttributesUpdatedNotification() && !CLIENT_SCOPE.equals(scope);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkNotifyDevice(String notifyDeviceMdValue) {
|
||||||
|
return config.getNotifyDevice() || StringUtils.isEmpty(notifyDeviceMdValue) || Boolean.parseBoolean(notifyDeviceMdValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getScope(String mdScopeValue) {
|
||||||
|
if (StringUtils.isNotEmpty(mdScopeValue)) {
|
||||||
|
return mdScopeValue;
|
||||||
|
}
|
||||||
|
return config.getScope();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,11 +30,15 @@ import java.util.List;
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RuleNode(
|
@RuleNode(
|
||||||
type = ComponentType.ACTION,
|
type = ComponentType.ACTION,
|
||||||
name = "delete attributes",
|
name = "delete attributes",
|
||||||
configClazz = TbMsgDeleteAttributesConfiguration.class,
|
configClazz = TbMsgDeleteAttributesNodeConfiguration.class,
|
||||||
nodeDescription = "Delete attributes for Message Originator.",
|
nodeDescription = "Delete attributes for Message Originator.",
|
||||||
nodeDetails = "Attempt to remove attributes by selected keys. If msg originator doesn't have an attribute with " +
|
nodeDetails = "Attempt to remove attributes by selected keys. If msg originator doesn't have an attribute with " +
|
||||||
" a key selected in the configuration, it will be ignored. If delete operation is completed successfully, " +
|
" a key selected in the configuration, it will be ignored. If delete operation is completed successfully, " +
|
||||||
@ -44,16 +48,14 @@ import java.util.stream.Collectors;
|
|||||||
configDirective = "tbActionNodeDeleteAttributesConfig",
|
configDirective = "tbActionNodeDeleteAttributesConfig",
|
||||||
icon = "remove_circle"
|
icon = "remove_circle"
|
||||||
)
|
)
|
||||||
public class TbMsgDeleteAttributes implements TbNode {
|
public class TbMsgDeleteAttributesNode implements TbNode {
|
||||||
|
|
||||||
private TbMsgDeleteAttributesConfiguration config;
|
private TbMsgDeleteAttributesNodeConfiguration config;
|
||||||
private String scope;
|
|
||||||
private List<String> keys;
|
private List<String> keys;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
this.config = TbNodeUtils.convert(configuration, TbMsgDeleteAttributesConfiguration.class);
|
this.config = TbNodeUtils.convert(configuration, TbMsgDeleteAttributesNodeConfiguration.class);
|
||||||
this.scope = config.getScope();
|
|
||||||
this.keys = config.getKeys();
|
this.keys = config.getKeys();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,15 +69,29 @@ public class TbMsgDeleteAttributes implements TbNode {
|
|||||||
if (keysToDelete.isEmpty()) {
|
if (keysToDelete.isEmpty()) {
|
||||||
ctx.tellSuccess(msg);
|
ctx.tellSuccess(msg);
|
||||||
} else {
|
} else {
|
||||||
|
String scope = getScope(msg.getMetaData().getValue(SCOPE));
|
||||||
ctx.getTelemetryService().deleteAndNotify(
|
ctx.getTelemetryService().deleteAndNotify(
|
||||||
ctx.getTenantId(),
|
ctx.getTenantId(),
|
||||||
msg.getOriginator(),
|
msg.getOriginator(),
|
||||||
scope,
|
scope,
|
||||||
keysToDelete,
|
keysToDelete,
|
||||||
|
checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope),
|
||||||
config.isSendAttributesDeletedNotification() ?
|
config.isSendAttributesDeletedNotification() ?
|
||||||
new AttributesDeleteNodeCallback(ctx, msg, scope, keysToDelete) :
|
new AttributesDeleteNodeCallback(ctx, msg, scope, keysToDelete) :
|
||||||
new TelemetryNodeCallback(ctx, msg)
|
new TelemetryNodeCallback(ctx, msg)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getScope(String mdScopeValue) {
|
||||||
|
if (StringUtils.isNotEmpty(mdScopeValue)) {
|
||||||
|
return mdScopeValue;
|
||||||
|
}
|
||||||
|
return config.getScope();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkNotifyDevice(String notifyDeviceMdValue, String scope) {
|
||||||
|
return SHARED_SCOPE.equals(scope) && (config.isNotifyDevice() || Boolean.parseBoolean(notifyDeviceMdValue));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -23,18 +23,20 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public class TbMsgDeleteAttributesConfiguration implements NodeConfiguration<TbMsgDeleteAttributesConfiguration> {
|
public class TbMsgDeleteAttributesNodeConfiguration implements NodeConfiguration<TbMsgDeleteAttributesNodeConfiguration> {
|
||||||
|
|
||||||
private String scope;
|
private String scope;
|
||||||
private List<String> keys;
|
private List<String> keys;
|
||||||
private boolean sendAttributesDeletedNotification;
|
private boolean sendAttributesDeletedNotification;
|
||||||
|
private boolean notifyDevice;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbMsgDeleteAttributesConfiguration defaultConfiguration() {
|
public TbMsgDeleteAttributesNodeConfiguration defaultConfiguration() {
|
||||||
TbMsgDeleteAttributesConfiguration configuration = new TbMsgDeleteAttributesConfiguration();
|
TbMsgDeleteAttributesNodeConfiguration configuration = new TbMsgDeleteAttributesNodeConfiguration();
|
||||||
configuration.setScope(DataConstants.SERVER_SCOPE);
|
configuration.setScope(DataConstants.SERVER_SCOPE);
|
||||||
configuration.setKeys(Collections.emptyList());
|
configuration.setKeys(Collections.emptyList());
|
||||||
configuration.setSendAttributesDeletedNotification(false);
|
configuration.setSendAttributesDeletedNotification(false);
|
||||||
|
configuration.setNotifyDevice(false);
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -20,15 +20,11 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.runner.RunWith;
|
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Captor;
|
|
||||||
import org.mockito.junit.MockitoJUnitRunner;
|
|
||||||
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
|
||||||
import org.thingsboard.rule.engine.api.TbContext;
|
import org.thingsboard.rule.engine.api.TbContext;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
@ -42,8 +38,10 @@ import java.util.function.Consumer;
|
|||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||||
import static org.mockito.ArgumentMatchers.anyList;
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.BDDMockito.willAnswer;
|
import static org.mockito.BDDMockito.willAnswer;
|
||||||
import static org.mockito.BDDMockito.willReturn;
|
import static org.mockito.BDDMockito.willReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
@ -51,14 +49,18 @@ import static org.mockito.Mockito.never;
|
|||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
||||||
|
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class TbMsgDeleteAttributesTest {
|
public class TbMsgDeleteAttributesNodeTest {
|
||||||
final ObjectMapper mapper = new ObjectMapper();
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
DeviceId deviceId;
|
DeviceId deviceId;
|
||||||
TbMsgDeleteAttributes node;
|
TbMsgDeleteAttributesNode node;
|
||||||
TbMsgDeleteAttributesConfiguration config;
|
TbMsgDeleteAttributesNodeConfiguration config;
|
||||||
TbNodeConfiguration nodeConfiguration;
|
TbNodeConfiguration nodeConfiguration;
|
||||||
TbContext ctx;
|
TbContext ctx;
|
||||||
TbMsgCallback callback;
|
TbMsgCallback callback;
|
||||||
@ -70,20 +72,20 @@ public class TbMsgDeleteAttributesTest {
|
|||||||
deviceId = new DeviceId(UUID.randomUUID());
|
deviceId = new DeviceId(UUID.randomUUID());
|
||||||
callback = mock(TbMsgCallback.class);
|
callback = mock(TbMsgCallback.class);
|
||||||
ctx = mock(TbContext.class);
|
ctx = mock(TbContext.class);
|
||||||
config = new TbMsgDeleteAttributesConfiguration().defaultConfiguration();
|
config = new TbMsgDeleteAttributesNodeConfiguration().defaultConfiguration();
|
||||||
config.setKeys(List.of("${TestAttribute_1}", "$[TestAttribute_2]", "$[TestAttribute_3]", "TestAttribute_4"));
|
config.setKeys(List.of("${TestAttribute_1}", "$[TestAttribute_2]", "$[TestAttribute_3]", "TestAttribute_4"));
|
||||||
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
||||||
node = spy(new TbMsgDeleteAttributes());
|
node = spy(new TbMsgDeleteAttributesNode());
|
||||||
node.init(ctx, nodeConfiguration);
|
node.init(ctx, nodeConfiguration);
|
||||||
telemetryService = mock(RuleEngineTelemetryService.class);
|
telemetryService = mock(RuleEngineTelemetryService.class);
|
||||||
|
|
||||||
willReturn(telemetryService).given(ctx).getTelemetryService();
|
willReturn(telemetryService).given(ctx).getTelemetryService();
|
||||||
willAnswer(invocation -> {
|
willAnswer(invocation -> {
|
||||||
TelemetryNodeCallback callBack = invocation.getArgument(4);
|
TelemetryNodeCallback callBack = invocation.getArgument(5);
|
||||||
callBack.onSuccess(null);
|
callBack.onSuccess(null);
|
||||||
return null;
|
return null;
|
||||||
}).given(telemetryService).deleteAndNotify(
|
}).given(telemetryService).deleteAndNotify(
|
||||||
any(), any(), anyString(), anyList(), any());
|
any(), any(), anyString(), anyList(), anyBoolean(), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
@ -93,30 +95,51 @@ public class TbMsgDeleteAttributesTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void givenDefaultConfig_whenVerify_thenOK() {
|
void givenDefaultConfig_whenVerify_thenOK() {
|
||||||
TbMsgDeleteAttributesConfiguration defaultConfig = new TbMsgDeleteAttributesConfiguration().defaultConfiguration();
|
TbMsgDeleteAttributesNodeConfiguration defaultConfig = new TbMsgDeleteAttributesNodeConfiguration().defaultConfiguration();
|
||||||
assertThat(defaultConfig.getScope()).isEqualTo(DataConstants.SERVER_SCOPE);
|
assertThat(defaultConfig.getScope()).isEqualTo(SERVER_SCOPE);
|
||||||
assertThat(defaultConfig.getKeys()).isEqualTo(Collections.emptyList());
|
assertThat(defaultConfig.getKeys()).isEqualTo(Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void givenMsg_whenOnMsg_thenVerifyOutput_NoSendAttributesDeletedNotification() throws Exception {
|
void givenMsg_whenOnMsg_thenVerifyOutput_NoSendAttributesDeletedNotification_NoNotifyDevice() throws Exception {
|
||||||
onMsg_thenVerifyOutput(false);
|
onMsg_thenVerifyOutput(false, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void givenMsg_whenOnMsg_thenVerifyOutput_SendAttributesDeletedNotification() throws Exception {
|
void givenMsg_whenOnMsg_thenVerifyOutput_SendAttributesDeletedNotification_NoNotifyDevice() throws Exception {
|
||||||
config.setSendAttributesDeletedNotification(true);
|
config.setSendAttributesDeletedNotification(true);
|
||||||
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
||||||
node.init(ctx, nodeConfiguration);
|
node.init(ctx, nodeConfiguration);
|
||||||
onMsg_thenVerifyOutput(true);
|
onMsg_thenVerifyOutput(true, false, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void onMsg_thenVerifyOutput(boolean sendAttributesDeletedNotification) throws Exception {
|
@Test
|
||||||
|
void givenMsg_whenOnMsg_thenVerifyOutput_SendAttributesDeletedNotification_NotifyDevice() throws Exception {
|
||||||
|
config.setSendAttributesDeletedNotification(true);
|
||||||
|
config.setNotifyDevice(true);
|
||||||
|
config.setScope(SHARED_SCOPE);
|
||||||
|
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
||||||
|
node.init(ctx, nodeConfiguration);
|
||||||
|
onMsg_thenVerifyOutput(true, true, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenMsg_whenOnMsg_thenVerifyOutput_NoSendAttributesDeletedNotification_NotifyDeviceMetadata() throws Exception {
|
||||||
|
nodeConfiguration = new TbNodeConfiguration(mapper.valueToTree(config));
|
||||||
|
node.init(ctx, nodeConfiguration);
|
||||||
|
onMsg_thenVerifyOutput(false, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void onMsg_thenVerifyOutput(boolean sendAttributesDeletedNotification, boolean notifyDevice, boolean notifyDeviceMetadata) throws Exception {
|
||||||
final Map<String, String> mdMap = Map.of(
|
final Map<String, String> mdMap = Map.of(
|
||||||
"TestAttribute_1", "temperature",
|
"TestAttribute_1", "temperature",
|
||||||
"city", "NY"
|
"city", "NY"
|
||||||
);
|
);
|
||||||
final TbMsgMetaData metaData = new TbMsgMetaData(mdMap);
|
TbMsgMetaData metaData = new TbMsgMetaData(mdMap);
|
||||||
|
if (notifyDeviceMetadata) {
|
||||||
|
metaData.putValue(NOTIFY_DEVICE_METADATA_KEY, "true");
|
||||||
|
metaData.putValue(SCOPE, SHARED_SCOPE);
|
||||||
|
}
|
||||||
final String data = "{\"TestAttribute_2\": \"humidity\", \"TestAttribute_3\": \"voltage\"}";
|
final String data = "{\"TestAttribute_2\": \"humidity\", \"TestAttribute_3\": \"voltage\"}";
|
||||||
|
|
||||||
TbMsg msg = TbMsg.newMsg("POST_ATTRIBUTES_REQUEST", deviceId, metaData, data, callback);
|
TbMsg msg = TbMsg.newMsg("POST_ATTRIBUTES_REQUEST", deviceId, metaData, data, callback);
|
||||||
@ -133,6 +156,6 @@ public class TbMsgDeleteAttributesTest {
|
|||||||
}
|
}
|
||||||
verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture());
|
verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture());
|
||||||
verify(ctx, never()).tellFailure(any(), any());
|
verify(ctx, never()).tellFailure(any(), any());
|
||||||
verify(telemetryService, times(1)).deleteAndNotify(any(), any(), anyString(), anyList(), any());
|
verify(telemetryService, times(1)).deleteAndNotify(any(), any(), anyString(), anyList(), eq(notifyDevice || notifyDeviceMetadata), any());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user