From 1d79f1e213f07ac2eddfe179e9a837947b3810fd Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 4 Jul 2024 11:42:15 +0200 Subject: [PATCH 1/4] send resourse change/delete events only for lwm2m --- .../queue/DefaultTbClusterService.java | 58 +++++++++++++------ .../service/DefaultTransportService.java | 6 +- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index bce5dece3b..612d10a976 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -24,12 +24,14 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasRuleEngineProfile; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; @@ -340,29 +342,33 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onResourceChange(TbResourceInfo resource, TbQueueCallback callback) { - TenantId tenantId = resource.getTenantId(); - log.trace("[{}][{}][{}] Processing change resource", tenantId, resource.getResourceType(), resource.getResourceKey()); - TransportProtos.ResourceUpdateMsg resourceUpdateMsg = TransportProtos.ResourceUpdateMsg.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setResourceType(resource.getResourceType().name()) - .setResourceKey(resource.getResourceKey()) - .build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceUpdateMsg(resourceUpdateMsg).build(); - broadcast(transportMsg, callback); + if (resource.getResourceType() == ResourceType.LWM2M_MODEL) { + TenantId tenantId = resource.getTenantId(); + log.trace("[{}][{}][{}] Processing change resource", tenantId, resource.getResourceType(), resource.getResourceKey()); + TransportProtos.ResourceUpdateMsg resourceUpdateMsg = TransportProtos.ResourceUpdateMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setResourceType(resource.getResourceType().name()) + .setResourceKey(resource.getResourceKey()) + .build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceUpdateMsg(resourceUpdateMsg).build(); + broadcast(transportMsg, DataConstants.LWM2M_TRANSPORT_NAME, callback); + } } @Override public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) { - log.trace("[{}] Processing delete resource", resource); - TransportProtos.ResourceDeleteMsg resourceUpdateMsg = TransportProtos.ResourceDeleteMsg.newBuilder() - .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits()) - .setResourceType(resource.getResourceType().name()) - .setResourceKey(resource.getResourceKey()) - .build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceUpdateMsg).build(); - broadcast(transportMsg, callback); + if (resource.getResourceType() == ResourceType.LWM2M_MODEL) { + log.trace("[{}] Processing delete resource", resource); + TransportProtos.ResourceDeleteMsg resourceUpdateMsg = TransportProtos.ResourceDeleteMsg.newBuilder() + .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits()) + .setResourceType(resource.getResourceType().name()) + .setResourceKey(resource.getResourceKey()) + .build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceUpdateMsg).build(); + broadcast(transportMsg, DataConstants.LWM2M_TRANSPORT_NAME, callback); + } } private void broadcastEntityChangeToTransport(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) { @@ -394,6 +400,20 @@ public class DefaultTbClusterService implements TbClusterService { } } + private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) { + TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); + Set tbTransportInfos = partitionService.getAllServices(ServiceType.TB_TRANSPORT); + TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportInfos.size(), callback) : null; + tbTransportInfos.stream() + .filter(info -> info.getTransportsList().contains(transportType)) + .map(TransportProtos.ServiceInfo::getServiceId) + .forEach(transportServiceId -> { + TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); + toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), proxyCallback); + toTransportNfs.incrementAndGet(); + }); + } + @Override public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index cc91baca0c..a422fe3a36 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -931,7 +931,7 @@ public class DefaultTransportService extends TransportActivityManager implements rateLimitService.remove(new DeviceId(entityUuid)); onDeviceDeleted(new DeviceId(entityUuid)); } - } else if (toSessionMsg.hasResourceUpdateMsg()) { + } else if (toSessionMsg.hasResourceUpdateMsg() && serviceInfoProvider.getServiceInfo().getTransportsList().contains(DataConstants.LWM2M_TRANSPORT_NAME)) { TransportProtos.ResourceUpdateMsg msg = toSessionMsg.getResourceUpdateMsg(); TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); ResourceType resourceType = ResourceType.valueOf(msg.getResourceType()); @@ -941,14 +941,14 @@ public class DefaultTransportService extends TransportActivityManager implements log.trace("ResourceUpdate - [{}] [{}]", id, mdRez); transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceUpdate(msg)); }); - } else if (toSessionMsg.hasResourceDeleteMsg()) { + } else if (toSessionMsg.hasResourceDeleteMsg() && serviceInfoProvider.getServiceInfo().getTransportsList().contains(DataConstants.LWM2M_TRANSPORT_NAME)) { TransportProtos.ResourceDeleteMsg msg = toSessionMsg.getResourceDeleteMsg(); TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); ResourceType resourceType = ResourceType.valueOf(msg.getResourceType()); String resourceId = msg.getResourceKey(); transportResourceCache.evict(tenantId, resourceType, resourceId); sessions.forEach((id, mdRez) -> { - log.warn("ResourceDelete - [{}] [{}]", id, mdRez); + log.trace("ResourceDelete - [{}] [{}]", id, mdRez); transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg)); }); } else if (toSessionMsg.getQueueUpdateMsgsCount() > 0) { From 6c00802d818ab07ae01801ead47b00b640cf4600 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 4 Jul 2024 12:53:21 +0200 Subject: [PATCH 2/4] removed redundant check --- .../common/transport/service/DefaultTransportService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index a422fe3a36..8e48061db6 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -931,7 +931,7 @@ public class DefaultTransportService extends TransportActivityManager implements rateLimitService.remove(new DeviceId(entityUuid)); onDeviceDeleted(new DeviceId(entityUuid)); } - } else if (toSessionMsg.hasResourceUpdateMsg() && serviceInfoProvider.getServiceInfo().getTransportsList().contains(DataConstants.LWM2M_TRANSPORT_NAME)) { + } else if (toSessionMsg.hasResourceUpdateMsg()) { TransportProtos.ResourceUpdateMsg msg = toSessionMsg.getResourceUpdateMsg(); TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); ResourceType resourceType = ResourceType.valueOf(msg.getResourceType()); @@ -941,7 +941,7 @@ public class DefaultTransportService extends TransportActivityManager implements log.trace("ResourceUpdate - [{}] [{}]", id, mdRez); transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceUpdate(msg)); }); - } else if (toSessionMsg.hasResourceDeleteMsg() && serviceInfoProvider.getServiceInfo().getTransportsList().contains(DataConstants.LWM2M_TRANSPORT_NAME)) { + } else if (toSessionMsg.hasResourceDeleteMsg()) { TransportProtos.ResourceDeleteMsg msg = toSessionMsg.getResourceDeleteMsg(); TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); ResourceType resourceType = ResourceType.valueOf(msg.getResourceType()); From 92845f60b6e4f50624b8edf1a01d8c1b6840a0f2 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 4 Jul 2024 13:13:13 +0200 Subject: [PATCH 3/4] minor refactoring --- .../server/service/queue/DefaultTbClusterService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 612d10a976..354e8dce04 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -360,13 +360,13 @@ public class DefaultTbClusterService implements TbClusterService { public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) { if (resource.getResourceType() == ResourceType.LWM2M_MODEL) { log.trace("[{}] Processing delete resource", resource); - TransportProtos.ResourceDeleteMsg resourceUpdateMsg = TransportProtos.ResourceDeleteMsg.newBuilder() + TransportProtos.ResourceDeleteMsg resourceDeleteMsg = TransportProtos.ResourceDeleteMsg.newBuilder() .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits()) .setResourceType(resource.getResourceType().name()) .setResourceKey(resource.getResourceKey()) .build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceUpdateMsg).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setResourceDeleteMsg(resourceDeleteMsg).build(); broadcast(transportMsg, DataConstants.LWM2M_TRANSPORT_NAME, callback); } } From be608445eb9960c3e198596b3eeeeda837ee4857 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 4 Jul 2024 13:35:53 +0200 Subject: [PATCH 4/4] minor improvements due to comments --- .../queue/DefaultTbClusterService.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 354e8dce04..eabec663a3 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -359,7 +359,7 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback) { if (resource.getResourceType() == ResourceType.LWM2M_MODEL) { - log.trace("[{}] Processing delete resource", resource); + log.trace("[{}][{}][{}] Processing delete resource", resource.getTenantId(), resource.getResourceType(), resource.getResourceKey()); TransportProtos.ResourceDeleteMsg resourceDeleteMsg = TransportProtos.ResourceDeleteMsg.newBuilder() .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits()) @@ -390,8 +390,19 @@ public class DefaultTbClusterService implements TbClusterService { } private void broadcast(ToTransportMsg transportMsg, TbQueueCallback callback) { - TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); Set tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT); + broadcast(transportMsg, tbTransportServices, callback); + } + + private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) { + Set tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT).stream() + .filter(info -> info.getTransportsList().contains(transportType)) + .map(TransportProtos.ServiceInfo::getServiceId).collect(Collectors.toSet()); + broadcast(transportMsg, tbTransportServices, callback); + } + + private void broadcast(ToTransportMsg transportMsg, Set tbTransportServices, TbQueueCallback callback) { + TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportServices.size(), callback) : null; for (String transportServiceId : tbTransportServices) { TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); @@ -400,20 +411,6 @@ public class DefaultTbClusterService implements TbClusterService { } } - private void broadcast(ToTransportMsg transportMsg, String transportType, TbQueueCallback callback) { - TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); - Set tbTransportInfos = partitionService.getAllServices(ServiceType.TB_TRANSPORT); - TbQueueCallback proxyCallback = callback != null ? new MultipleTbQueueCallbackWrapper(tbTransportInfos.size(), callback) : null; - tbTransportInfos.stream() - .filter(info -> info.getTransportsList().contains(transportType)) - .map(TransportProtos.ServiceInfo::getServiceId) - .forEach(transportServiceId -> { - TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); - toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), proxyCallback); - toTransportNfs.incrementAndGet(); - }); - } - @Override public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId);