Fix delete attributes cluster notification

This commit is contained in:
Andrii Shvaika 2021-11-29 16:51:43 +02:00
parent 26d1872442
commit 1298f6b130

View File

@ -222,7 +222,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
} }
} }
return subscriptionUpdate; return subscriptionUpdate;
}); }, true);
if (entityId.getEntityType() == EntityType.DEVICE) { if (entityId.getEntityType() == EntityType.DEVICE) {
updateDeviceInactivityTimeout(tenantId, entityId, ts); updateDeviceInactivityTimeout(tenantId, entityId, ts);
} }
@ -256,7 +256,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
} }
} }
return subscriptionUpdate; return subscriptionUpdate;
}); }, true);
if (entityId.getEntityType() == EntityType.DEVICE) { if (entityId.getEntityType() == EntityType.DEVICE) {
if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) { if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) {
updateDeviceInactivityTimeout(tenantId, entityId, attributes); updateDeviceInactivityTimeout(tenantId, entityId, attributes);
@ -333,14 +333,15 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
} }
} }
return subscriptionUpdate; return subscriptionUpdate;
}); }, false);
callback.onSuccess(); callback.onSuccess();
} }
private <T extends TbSubscription> void onLocalTelemetrySubUpdate(EntityId entityId, private <T extends TbSubscription> void onLocalTelemetrySubUpdate(EntityId entityId,
Function<TbSubscription, T> castFunction, Function<TbSubscription, T> castFunction,
Predicate<T> filterFunction, Predicate<T> filterFunction,
Function<T, List<TsKvEntry>> processFunction) { Function<T, List<TsKvEntry>> processFunction,
boolean ignoreEmptyUpdates) {
Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId); Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId);
if (entitySubscriptions != null) { if (entitySubscriptions != null) {
entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> { entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> {
@ -351,7 +352,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY); localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
} else { } else {
TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId()); TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
toCoreNotificationsProducer.send(tpi, toProto(s, subscriptionUpdate), null); toCoreNotificationsProducer.send(tpi, toProto(s, subscriptionUpdate, ignoreEmptyUpdates), null);
} }
} }
}); });
@ -467,6 +468,10 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
} }
private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, List<TsKvEntry> updates) { private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, List<TsKvEntry> updates) {
return toProto(subscription, updates, true);
}
private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, List<TsKvEntry> updates, boolean ignoreEmptyUpdates) {
TbSubscriptionUpdateProto.Builder builder = TbSubscriptionUpdateProto.newBuilder(); TbSubscriptionUpdateProto.Builder builder = TbSubscriptionUpdateProto.newBuilder();
builder.setSessionId(subscription.getSessionId()); builder.setSessionId(subscription.getSessionId());
@ -494,7 +499,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
dataBuilder.addValue(strVal); dataBuilder.addValue(strVal);
} }
} }
if (hasData) { if (!ignoreEmptyUpdates || hasData) {
builder.addData(dataBuilder.build()); builder.addData(dataBuilder.build());
} }
}); });