Save attributes strategies: preserve backwards compatibility when deprecating 'notifyDevice'
This commit is contained in:
parent
9d9fd93c89
commit
264a767efe
@ -542,10 +542,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
|||||||
proto.getScope(), KvProtoUtil.toAttributeKvList(proto.getDataList()), callback);
|
proto.getScope(), KvProtoUtil.toAttributeKvList(proto.getDataList()), callback);
|
||||||
} else if (msg.hasAttrDelete()) {
|
} else if (msg.hasAttrDelete()) {
|
||||||
TbAttributeDeleteProto proto = msg.getAttrDelete();
|
TbAttributeDeleteProto proto = msg.getAttrDelete();
|
||||||
subscriptionManagerService.onAttributesDelete(
|
if (proto.hasNotifyDevice()) {
|
||||||
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
|
// handles old messages with deprecated 'notifyDevice'
|
||||||
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
subscriptionManagerService.onAttributesDelete(
|
||||||
proto.getScope(), proto.getKeysList(), callback);
|
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
|
||||||
|
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
||||||
|
proto.getScope(), proto.getKeysList(), proto.getNotifyDevice(), callback);
|
||||||
|
} else {
|
||||||
|
// handles new messages without 'notifyDevice'
|
||||||
|
subscriptionManagerService.onAttributesDelete(
|
||||||
|
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
|
||||||
|
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
||||||
|
proto.getScope(), proto.getKeysList(), callback);
|
||||||
|
}
|
||||||
} else if (msg.hasTsDelete()) {
|
} else if (msg.hasTsDelete()) {
|
||||||
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
|
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
|
||||||
subscriptionManagerService.onTimeSeriesDelete(
|
subscriptionManagerService.onTimeSeriesDelete(
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.context.event.EventListener;
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
@ -35,6 +36,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
|
|||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
|
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||||
import org.thingsboard.server.queue.TbQueueProducer;
|
import org.thingsboard.server.queue.TbQueueProducer;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
@ -74,6 +76,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
private final TbQueueProducerProvider producerProvider;
|
private final TbQueueProducerProvider producerProvider;
|
||||||
private final TbLocalSubscriptionService localSubscriptionService;
|
private final TbLocalSubscriptionService localSubscriptionService;
|
||||||
private final DeviceStateService deviceStateService;
|
private final DeviceStateService deviceStateService;
|
||||||
|
private final TbClusterService clusterService;
|
||||||
private final SubscriptionSchedulerComponent scheduler;
|
private final SubscriptionSchedulerComponent scheduler;
|
||||||
|
|
||||||
private final Lock subsLock = new ReentrantLock();
|
private final Lock subsLock = new ReentrantLock();
|
||||||
@ -215,12 +218,20 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback callback) {
|
public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback callback) {
|
||||||
|
onAttributesDelete(tenantId, entityId, scope, keys, false, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, TbCallback callback) {
|
||||||
processAttributesUpdate(entityId, scope,
|
processAttributesUpdate(entityId, scope,
|
||||||
keys.stream().map(key -> new BaseAttributeKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList()));
|
keys.stream().map(key -> new BaseAttributeKvEntry(0, new StringDataEntry(key, ""))).collect(Collectors.toList()));
|
||||||
if (entityId.getEntityType() == EntityType.DEVICE) {
|
if (entityId.getEntityType() == EntityType.DEVICE) {
|
||||||
if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)
|
if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)
|
||||||
|| TbAttributeSubscriptionScope.ANY_SCOPE.name().equalsIgnoreCase(scope)) {
|
|| TbAttributeSubscriptionScope.ANY_SCOPE.name().equalsIgnoreCase(scope)) {
|
||||||
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
||||||
|
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
||||||
|
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(tenantId,
|
||||||
|
new DeviceId(entityId.getId()), scope, keys), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
|
|||||||
@ -41,6 +41,15 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
|
|||||||
|
|
||||||
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
|
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is retained solely for backwards compatibility, specifically to handle
|
||||||
|
* legacy proto messages that include the notifyDevice field.
|
||||||
|
*
|
||||||
|
* @deprecated as of 4.0, this method will be removed in future releases.
|
||||||
|
*/
|
||||||
|
@Deprecated(forRemoval = true, since = "4.0")
|
||||||
|
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, TbCallback empty);
|
||||||
|
|
||||||
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
|
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
|
||||||
|
|
||||||
void onAlarmUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback);
|
void onAlarmUpdate(TenantId tenantId, EntityId entityId, AlarmInfo alarm, TbCallback callback);
|
||||||
|
|||||||
@ -1079,8 +1079,11 @@ message TbAttributeDeleteProto {
|
|||||||
int64 tenantIdLSB = 5;
|
int64 tenantIdLSB = 5;
|
||||||
string scope = 6;
|
string scope = 6;
|
||||||
repeated string keys = 7;
|
repeated string keys = 7;
|
||||||
// Deprecated since 4.0, not used anymore since device notification are now handled in DefaultTelemetrySubscriptionService instead of DefaultSubscriptionManagerService
|
// DEPRECATED. FOR REMOVAL
|
||||||
bool notifyDevice = 8 [deprecated = true];
|
// Since 4.0, this field is no longer used.
|
||||||
|
// Device notifications are now handled directly by DefaultTelemetrySubscriptionService,
|
||||||
|
// eliminating the need to pass this parameter through the queue and proto to DefaultSubscriptionManagerService.
|
||||||
|
optional bool notifyDevice = 8 [deprecated = true];
|
||||||
}
|
}
|
||||||
|
|
||||||
message TbTimeSeriesDeleteProto {
|
message TbTimeSeriesDeleteProto {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user