Merge pull request #11151 from YevhenBondarenko/hotfix/resource-notifications

send resourse change/delete events only for lwm2m
This commit is contained in:
Viacheslav Klimov 2024-07-04 16:52:14 +03:00 committed by GitHub
commit 9b11b92ba5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 38 additions and 21 deletions

View File

@ -24,12 +24,14 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.HasRuleEngineProfile; import org.thingsboard.server.common.data.HasRuleEngineProfile;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.TenantProfile;
@ -340,29 +342,33 @@ public class DefaultTbClusterService implements TbClusterService {
@Override @Override
public void onResourceChange(TbResourceInfo resource, TbQueueCallback callback) { public void onResourceChange(TbResourceInfo resource, TbQueueCallback callback) {
TenantId tenantId = resource.getTenantId(); if (resource.getResourceType() == ResourceType.LWM2M_MODEL) {
log.trace("[{}][{}][{}] Processing change resource", tenantId, resource.getResourceType(), resource.getResourceKey()); TenantId tenantId = resource.getTenantId();
TransportProtos.ResourceUpdateMsg resourceUpdateMsg = TransportProtos.ResourceUpdateMsg.newBuilder() log.trace("[{}][{}][{}] Processing change resource", tenantId, resource.getResourceType(), resource.getResourceKey());
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) TransportProtos.ResourceUpdateMsg resourceUpdateMsg = TransportProtos.ResourceUpdateMsg.newBuilder()
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setResourceType(resource.getResourceType().name()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setResourceKey(resource.getResourceKey()) .setResourceType(resource.getResourceType().name())
.build(); .setResourceKey(resource.getResourceKey())
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceUpdateMsg(resourceUpdateMsg).build(); .build();
broadcast(transportMsg, callback); ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceUpdateMsg(resourceUpdateMsg).build();
broadcast(transportMsg, DataConstants.LWM2M_TRANSPORT_NAME, callback);
}
} }
@Override @Override
public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) { public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) {
log.trace("[{}] Processing delete resource", resource); if (resource.getResourceType() == ResourceType.LWM2M_MODEL) {
TransportProtos.ResourceDeleteMsg resourceUpdateMsg = TransportProtos.ResourceDeleteMsg.newBuilder() log.trace("[{}][{}][{}] Processing delete resource", resource.getTenantId(), resource.getResourceType(), resource.getResourceKey());
.setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits()) TransportProtos.ResourceDeleteMsg resourceDeleteMsg = TransportProtos.ResourceDeleteMsg.newBuilder()
.setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits()) .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits())
.setResourceType(resource.getResourceType().name()) .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits())
.setResourceKey(resource.getResourceKey()) .setResourceType(resource.getResourceType().name())
.build(); .setResourceKey(resource.getResourceKey())
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceUpdateMsg).build(); .build();
broadcast(transportMsg, callback); ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceDeleteMsg).build();
broadcast(transportMsg, DataConstants.LWM2M_TRANSPORT_NAME, callback);
}
} }
private <T> void broadcastEntityChangeToTransport(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) { private <T> void broadcastEntityChangeToTransport(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) {
@ -384,8 +390,19 @@ public class DefaultTbClusterService implements TbClusterService {
} }
private void broadcast(ToTransportMsg transportMsg, TbQueueCallback callback) { private void broadcast(ToTransportMsg transportMsg, TbQueueCallback callback) {
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT); Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT);
broadcast(transportMsg, tbTransportServices, callback);
}
private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) {
Set<String> tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT).stream()
.filter(info -> info.getTransportsList().contains(transportType))
.map(TransportProtos.ServiceInfo::getServiceId).collect(Collectors.toSet());
broadcast(transportMsg, tbTransportServices, callback);
}
private void broadcast(ToTransportMsg transportMsg, Set<String> tbTransportServices, TbQueueCallback callback) {
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportServices.size(), callback) : null; TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportServices.size(), callback) : null;
for (String transportServiceId : tbTransportServices) { for (String transportServiceId : tbTransportServices) {
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId);

View File

@ -948,7 +948,7 @@ public class DefaultTransportService extends TransportActivityManager implements
String resourceId = msg.getResourceKey(); String resourceId = msg.getResourceKey();
transportResourceCache.evict(tenantId, resourceType, resourceId); transportResourceCache.evict(tenantId, resourceType, resourceId);
sessions.forEach((id, mdRez) -> { sessions.forEach((id, mdRez) -> {
log.warn("ResourceDelete - [{}] [{}]", id, mdRez); log.trace("ResourceDelete - [{}] [{}]", id, mdRez);
transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg)); transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg));
}); });
} else if (toSessionMsg.getQueueUpdateMsgsCount() > 0) { } else if (toSessionMsg.getQueueUpdateMsgsCount() > 0) {