From d999de21119d213429b6e8f2568723eb3f565222 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 8 Apr 2025 12:59:14 +0300 Subject: [PATCH 1/3] Do not create public customer in case edge connection - create only during make public entities and sync to edge --- .../service/edge/rpc/EdgeSyncCursor.java | 6 +++-- .../customer/CustomerEdgeProcessor.java | 25 ++++++++++++++----- .../server/controller/EdgeControllerTest.java | 4 +++ .../server/edge/AbstractEdgeTest.java | 4 +++ .../server/dao/customer/CustomerService.java | 2 ++ .../dao/customer/CustomerServiceImpl.java | 19 +++++++++----- 6 files changed, 46 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index dd9f9a2b42..351c9b411b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -65,8 +65,10 @@ public class EdgeSyncCursor { fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService())); fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService())); } - Customer publicCustomer = ctx.getCustomerService().findOrCreatePublicCustomer(edge.getTenantId()); - fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId())); + Customer publicCustomer = ctx.getCustomerService().findPublicCustomer(edge.getTenantId()); + if (publicCustomer != null) { + fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId())); + } if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { fetchers.add(new CustomerEdgeEventFetcher(edge.getCustomerId())); fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId())); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/customer/CustomerEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/customer/CustomerEdgeProcessor.java index 784d10b26c..5f02fbc554 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/customer/CustomerEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/customer/CustomerEdgeProcessor.java @@ -81,13 +81,16 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor { UUID uuid = new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()); CustomerId customerId = new CustomerId(EntityIdFactory.getByEdgeEventTypeAndUuid(type, uuid).getId()); switch (actionType) { - case UPDATED: - List> futures = new ArrayList<>(); - PageDataIterable edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findEdgesByTenantIdAndCustomerId(tenantId, customerId, link), 1024); - for (Edge edge : edges) { - futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null)); + case ADDED: + Customer customerById = edgeCtx.getCustomerService().findCustomerById(tenantId, customerId); + if (customerById != null && customerById.isPublic()) { + return findEdgesAndSaveEdgeEvents(link -> edgeCtx.getEdgeService().findEdgesByTenantId(tenantId, link), + tenantId, type, actionType, customerId); } - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + return Futures.immediateFuture(null); + case UPDATED: + return findEdgesAndSaveEdgeEvents(link -> edgeCtx.getEdgeService().findEdgesByTenantIdAndCustomerId(tenantId, customerId, link), + tenantId, type, actionType, customerId); case DELETED: EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); return saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null); @@ -96,6 +99,16 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor { } } + public ListenableFuture findEdgesAndSaveEdgeEvents(PageDataIterable.FetchFunction edgeFetcher, TenantId tenantId, + EdgeEventType type, EdgeEventActionType actionType, CustomerId customerId) { + List> futures = new ArrayList<>(); + PageDataIterable edges = new PageDataIterable<>(edgeFetcher, 1024); + for (Edge edge : edges) { + futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null)); + } + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + } + @Override public EdgeEventType getEdgeEventType() { return EdgeEventType.CUSTOMER; diff --git a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java index 41a85721a8..27c7da09b2 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java @@ -885,6 +885,10 @@ public class EdgeControllerTest extends AbstractControllerTest { device.setType("default"); Device savedDevice = doPost("/api/device", device, Device.class); + // create public customer + doPost("/api/customer/public/device/" + savedDevice.getId().getId(), Device.class); + doDelete("/api/customer/device/" + savedDevice.getId().getId(), Device.class); + simulateEdgeActivation(edge); doPost("/api/edge/" + edge.getId().getId().toString() diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index feac7adb04..d84e0ebea0 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -168,6 +168,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { Device savedDevice = saveDevice("Edge Device 1", THERMOSTAT_DEVICE_PROFILE_NAME); + // create public customer + doPost("/api/customer/public/device/" + savedDevice.getId().getId(), Device.class); + doDelete("/api/customer/device/" + savedDevice.getId().getId(), Device.class); + Asset savedAsset = saveAsset("Edge Asset 1"); updateRootRuleChainMetadata(); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/customer/CustomerService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/customer/CustomerService.java index 5420cd5297..d478e2099a 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/customer/CustomerService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/customer/CustomerService.java @@ -41,6 +41,8 @@ public interface CustomerService extends EntityDaoService { Customer findOrCreatePublicCustomer(TenantId tenantId); + Customer findPublicCustomer(TenantId tenantId); + PageData findCustomersByTenantId(TenantId tenantId, PageLink pageLink); void deleteCustomersByTenantId(TenantId tenantId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java index 4b05cdbc75..b06902d95e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/customer/CustomerServiceImpl.java @@ -213,12 +213,11 @@ public class CustomerServiceImpl extends AbstractCachedEntityService INCORRECT_TENANT_ID + id); - Optional publicCustomerOpt = customerDao.findPublicCustomerByTenantId(tenantId.getId()); - if (publicCustomerOpt.isPresent()) { - return publicCustomerOpt.get(); + var publicCustomer = findPublicCustomer(tenantId); + if (publicCustomer != null) { + return publicCustomer; } - var publicCustomer = new Customer(); + publicCustomer = new Customer(); publicCustomer.setTenantId(tenantId); publicCustomer.setTitle(PUBLIC_CUSTOMER_TITLE); try { @@ -230,7 +229,7 @@ public class CustomerServiceImpl extends AbstractCachedEntityService publicCustomerOpt = customerDao.findPublicCustomerByTenantId(tenantId.getId()); if (publicCustomerOpt.isPresent()) { return publicCustomerOpt.get(); } @@ -239,6 +238,14 @@ public class CustomerServiceImpl extends AbstractCachedEntityService INCORRECT_TENANT_ID + id); + Optional publicCustomerOpt = customerDao.findPublicCustomerByTenantId(tenantId.getId()); + return publicCustomerOpt.orElse(null); + } + @Override public PageData findCustomersByTenantId(TenantId tenantId, PageLink pageLink) { log.trace("Executing findCustomersByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); From a38d39fda9a452216356d0857d928ca2905c9dbb Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 8 Apr 2025 18:23:19 +0300 Subject: [PATCH 2/3] TenantProfileEdgeTest - revert tenant profile update --- .../server/edge/TenantProfileEdgeTest.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java index e58126737d..51cfa44ff9 100644 --- a/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/TenantProfileEdgeTest.java @@ -44,9 +44,8 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest { @Test public void testTenantProfiles() throws Exception { loginSysAdmin(); - - // save current values into tmp to revert after test - TenantProfile edgeTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class); + TenantProfile originalTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class); + TenantProfile edgeTenantProfile = new TenantProfile(originalTenantProfile); // updated edge tenant profile edgeTenantProfile.setName("Tenant Profile Edge Test"); @@ -64,14 +63,15 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest { Assert.assertEquals("Updated tenant profile Edge Test", tenantProfileMsg.getDescription()); Assert.assertEquals("Tenant Profile Edge Test", tenantProfileMsg.getName()); + doPost("/api/tenantProfile", originalTenantProfile, TenantProfile.class); loginTenantAdmin(); } @Test public void testIsolatedTenantProfile() throws Exception { loginSysAdmin(); - - TenantProfile edgeTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class); + TenantProfile originalTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class); + TenantProfile edgeTenantProfile = new TenantProfile(originalTenantProfile); // set tenant profile isolated and add 2 queues - main and isolated edgeTenantProfile.setIsolatedTbRuleEngine(true); @@ -110,6 +110,10 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest { Assert.assertNotNull(queue); Assert.assertEquals(tenantId, queue.getTenantId()); } + + loginSysAdmin(); + doPost("/api/tenantProfile", originalTenantProfile, TenantProfile.class); + loginTenantAdmin(); } private TenantProfileQueueConfiguration createQueueConfig(String queueName, String queueTopic) { From 79381092b2a4acfb8fef8a8244eb87b76433edcc Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 9 Apr 2025 12:07:50 +0300 Subject: [PATCH 3/3] Do not push edge notifications in case edges disabled. Check if edges enabled before processing msg by edge consumer --- .../queue/DefaultTbClusterService.java | 14 ++++++++----- .../queue/DefaultTbEdgeConsumerService.java | 21 ++++++++++++------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 92c48a5fed..561dc7122a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -551,11 +551,15 @@ public class DefaultTbClusterService implements TbClusterService { } private void processEdgeNotification(EdgeId edgeId, ToEdgeNotificationMsg toEdgeNotificationMsg) { - var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId)); - serviceIdOpt.ifPresentOrElse( - serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()), - () -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg) - ); + if (edgesEnabled) { + var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId)); + serviceIdOpt.ifPresentOrElse( + serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()), + () -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg) + ); + } else { + log.trace("Edges disabled. Ignoring edge notification {} for edgeId: {}", toEdgeNotificationMsg, edgeId); + } } private void pushMsgToEdgeNotification(ToEdgeNotificationMsg toEdgeNotificationMsg, String serviceId) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java index fdaa2103e2..2e4c15f8e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java @@ -51,6 +51,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; @@ -195,36 +196,42 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService msg, TbCallback callback) { ToEdgeNotificationMsg toEdgeNotificationMsg = msg.getValue(); try { + EdgeRpcService edgeRpcService = edgeCtx.getEdgeRpcService(); + if (edgeRpcService == null) { + log.debug("No EdgeRpcService available (edge functionality disabled), ignoring msg: {}", toEdgeNotificationMsg); + callback.onSuccess(); + return; + } if (toEdgeNotificationMsg.hasEdgeHighPriority()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeHighPriority()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasEdgeEventUpdate()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeEventUpdate()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasToEdgeSyncRequest()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getToEdgeSyncRequest()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasFromEdgeSyncResponse()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getFromEdgeSyncResponse()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasComponentLifecycle()) { ComponentLifecycleMsg componentLifecycle = ProtoUtils.fromProto(toEdgeNotificationMsg.getComponentLifecycle()); TenantId tenantId = componentLifecycle.getTenantId(); EdgeId edgeId = new EdgeId(componentLifecycle.getEntityId().getId()); if (ComponentLifecycleEvent.DELETED.equals(componentLifecycle.getEvent())) { - edgeCtx.getEdgeRpcService().deleteEdge(tenantId, edgeId); + edgeRpcService.deleteEdge(tenantId, edgeId); } else if (ComponentLifecycleEvent.UPDATED.equals(componentLifecycle.getEvent())) { Edge edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId); - edgeCtx.getEdgeRpcService().updateEdge(tenantId, edge); + edgeRpcService.updateEdge(tenantId, edge); } callback.onSuccess(); } } catch (Exception e) { - log.error("Error processing edge notification message", e); + log.error("Error processing edge notification message {}", toEdgeNotificationMsg, e); callback.onFailure(e); }