Merge pull request #13130 from volodymyr-babak/public-edge-customer

Avoid Creation of Unnecessary Public Customers During Edge Connection
This commit is contained in:
Andrew Shvayka 2025-04-10 11:38:19 +04:00 committed by GitHub
commit 8e7b143a5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 78 additions and 31 deletions

View File

@ -65,8 +65,10 @@ public class EdgeSyncCursor {
fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService())); fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService()));
fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService())); fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService()));
} }
Customer publicCustomer = ctx.getCustomerService().findOrCreatePublicCustomer(edge.getTenantId()); Customer publicCustomer = ctx.getCustomerService().findPublicCustomer(edge.getTenantId());
if (publicCustomer != null) {
fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId())); fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId()));
}
if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
fetchers.add(new CustomerEdgeEventFetcher(edge.getCustomerId())); fetchers.add(new CustomerEdgeEventFetcher(edge.getCustomerId()));
fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId())); fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId()));

View File

@ -81,13 +81,16 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor {
UUID uuid = new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()); UUID uuid = new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB());
CustomerId customerId = new CustomerId(EntityIdFactory.getByEdgeEventTypeAndUuid(type, uuid).getId()); CustomerId customerId = new CustomerId(EntityIdFactory.getByEdgeEventTypeAndUuid(type, uuid).getId());
switch (actionType) { switch (actionType) {
case UPDATED: case ADDED:
List<ListenableFuture<Void>> futures = new ArrayList<>(); Customer customerById = edgeCtx.getCustomerService().findCustomerById(tenantId, customerId);
PageDataIterable<Edge> edges = new PageDataIterable<>(link -> edgeCtx.getEdgeService().findEdgesByTenantIdAndCustomerId(tenantId, customerId, link), 1024); if (customerById != null && customerById.isPublic()) {
for (Edge edge : edges) { return findEdgesAndSaveEdgeEvents(link -> edgeCtx.getEdgeService().findEdgesByTenantId(tenantId, link),
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null)); tenantId, type, actionType, customerId);
} }
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); return Futures.immediateFuture(null);
case UPDATED:
return findEdgesAndSaveEdgeEvents(link -> edgeCtx.getEdgeService().findEdgesByTenantIdAndCustomerId(tenantId, customerId, link),
tenantId, type, actionType, customerId);
case DELETED: case DELETED:
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
return saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null); return saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null);
@ -96,6 +99,16 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor {
} }
} }
public ListenableFuture<Void> findEdgesAndSaveEdgeEvents(PageDataIterable.FetchFunction<Edge> edgeFetcher, TenantId tenantId,
EdgeEventType type, EdgeEventActionType actionType, CustomerId customerId) {
List<ListenableFuture<Void>> futures = new ArrayList<>();
PageDataIterable<Edge> edges = new PageDataIterable<>(edgeFetcher, 1024);
for (Edge edge : edges) {
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null));
}
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
}
@Override @Override
public EdgeEventType getEdgeEventType() { public EdgeEventType getEdgeEventType() {
return EdgeEventType.CUSTOMER; return EdgeEventType.CUSTOMER;

View File

@ -551,11 +551,15 @@ public class DefaultTbClusterService implements TbClusterService {
} }
private void processEdgeNotification(EdgeId edgeId, ToEdgeNotificationMsg toEdgeNotificationMsg) { private void processEdgeNotification(EdgeId edgeId, ToEdgeNotificationMsg toEdgeNotificationMsg) {
if (edgesEnabled) {
var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId)); var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId));
serviceIdOpt.ifPresentOrElse( serviceIdOpt.ifPresentOrElse(
serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()), serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()),
() -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg) () -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg)
); );
} else {
log.trace("Edges disabled. Ignoring edge notification {} for edgeId: {}", toEdgeNotificationMsg, edgeId);
}
} }
private void pushMsgToEdgeNotification(ToEdgeNotificationMsg toEdgeNotificationMsg, String serviceId) { private void pushMsgToEdgeNotification(ToEdgeNotificationMsg toEdgeNotificationMsg, String serviceId) {

View File

@ -51,6 +51,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.queue.processing.IdMsgPair;
@ -195,36 +196,42 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdge
protected void handleNotification(UUID id, TbProtoQueueMsg<ToEdgeNotificationMsg> msg, TbCallback callback) { protected void handleNotification(UUID id, TbProtoQueueMsg<ToEdgeNotificationMsg> msg, TbCallback callback) {
ToEdgeNotificationMsg toEdgeNotificationMsg = msg.getValue(); ToEdgeNotificationMsg toEdgeNotificationMsg = msg.getValue();
try { try {
EdgeRpcService edgeRpcService = edgeCtx.getEdgeRpcService();
if (edgeRpcService == null) {
log.debug("No EdgeRpcService available (edge functionality disabled), ignoring msg: {}", toEdgeNotificationMsg);
callback.onSuccess();
return;
}
if (toEdgeNotificationMsg.hasEdgeHighPriority()) { if (toEdgeNotificationMsg.hasEdgeHighPriority()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeHighPriority()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeHighPriority());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasEdgeEventUpdate()) { } else if (toEdgeNotificationMsg.hasEdgeEventUpdate()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeEventUpdate()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeEventUpdate());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasToEdgeSyncRequest()) { } else if (toEdgeNotificationMsg.hasToEdgeSyncRequest()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getToEdgeSyncRequest()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getToEdgeSyncRequest());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasFromEdgeSyncResponse()) { } else if (toEdgeNotificationMsg.hasFromEdgeSyncResponse()) {
EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getFromEdgeSyncResponse()); EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getFromEdgeSyncResponse());
edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg);
callback.onSuccess(); callback.onSuccess();
} else if (toEdgeNotificationMsg.hasComponentLifecycle()) { } else if (toEdgeNotificationMsg.hasComponentLifecycle()) {
ComponentLifecycleMsg componentLifecycle = ProtoUtils.fromProto(toEdgeNotificationMsg.getComponentLifecycle()); ComponentLifecycleMsg componentLifecycle = ProtoUtils.fromProto(toEdgeNotificationMsg.getComponentLifecycle());
TenantId tenantId = componentLifecycle.getTenantId(); TenantId tenantId = componentLifecycle.getTenantId();
EdgeId edgeId = new EdgeId(componentLifecycle.getEntityId().getId()); EdgeId edgeId = new EdgeId(componentLifecycle.getEntityId().getId());
if (ComponentLifecycleEvent.DELETED.equals(componentLifecycle.getEvent())) { if (ComponentLifecycleEvent.DELETED.equals(componentLifecycle.getEvent())) {
edgeCtx.getEdgeRpcService().deleteEdge(tenantId, edgeId); edgeRpcService.deleteEdge(tenantId, edgeId);
} else if (ComponentLifecycleEvent.UPDATED.equals(componentLifecycle.getEvent())) { } else if (ComponentLifecycleEvent.UPDATED.equals(componentLifecycle.getEvent())) {
Edge edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId); Edge edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId);
edgeCtx.getEdgeRpcService().updateEdge(tenantId, edge); edgeRpcService.updateEdge(tenantId, edge);
} }
callback.onSuccess(); callback.onSuccess();
} }
} catch (Exception e) { } catch (Exception e) {
log.error("Error processing edge notification message", e); log.error("Error processing edge notification message {}", toEdgeNotificationMsg, e);
callback.onFailure(e); callback.onFailure(e);
} }

View File

@ -885,6 +885,10 @@ public class EdgeControllerTest extends AbstractControllerTest {
device.setType("default"); device.setType("default");
Device savedDevice = doPost("/api/device", device, Device.class); Device savedDevice = doPost("/api/device", device, Device.class);
// create public customer
doPost("/api/customer/public/device/" + savedDevice.getId().getId(), Device.class);
doDelete("/api/customer/device/" + savedDevice.getId().getId(), Device.class);
simulateEdgeActivation(edge); simulateEdgeActivation(edge);
doPost("/api/edge/" + edge.getId().getId().toString() doPost("/api/edge/" + edge.getId().getId().toString()

View File

@ -168,6 +168,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
Device savedDevice = saveDevice("Edge Device 1", THERMOSTAT_DEVICE_PROFILE_NAME); Device savedDevice = saveDevice("Edge Device 1", THERMOSTAT_DEVICE_PROFILE_NAME);
// create public customer
doPost("/api/customer/public/device/" + savedDevice.getId().getId(), Device.class);
doDelete("/api/customer/device/" + savedDevice.getId().getId(), Device.class);
Asset savedAsset = saveAsset("Edge Asset 1"); Asset savedAsset = saveAsset("Edge Asset 1");
updateRootRuleChainMetadata(); updateRootRuleChainMetadata();

View File

@ -44,9 +44,8 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest {
@Test @Test
public void testTenantProfiles() throws Exception { public void testTenantProfiles() throws Exception {
loginSysAdmin(); loginSysAdmin();
TenantProfile originalTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class);
// save current values into tmp to revert after test TenantProfile edgeTenantProfile = new TenantProfile(originalTenantProfile);
TenantProfile edgeTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class);
// updated edge tenant profile // updated edge tenant profile
edgeTenantProfile.setName("Tenant Profile Edge Test"); edgeTenantProfile.setName("Tenant Profile Edge Test");
@ -64,14 +63,15 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest {
Assert.assertEquals("Updated tenant profile Edge Test", tenantProfileMsg.getDescription()); Assert.assertEquals("Updated tenant profile Edge Test", tenantProfileMsg.getDescription());
Assert.assertEquals("Tenant Profile Edge Test", tenantProfileMsg.getName()); Assert.assertEquals("Tenant Profile Edge Test", tenantProfileMsg.getName());
doPost("/api/tenantProfile", originalTenantProfile, TenantProfile.class);
loginTenantAdmin(); loginTenantAdmin();
} }
@Test @Test
public void testIsolatedTenantProfile() throws Exception { public void testIsolatedTenantProfile() throws Exception {
loginSysAdmin(); loginSysAdmin();
TenantProfile originalTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class);
TenantProfile edgeTenantProfile = doGet("/api/tenantProfile/" + tenantProfileId.getId(), TenantProfile.class); TenantProfile edgeTenantProfile = new TenantProfile(originalTenantProfile);
// set tenant profile isolated and add 2 queues - main and isolated // set tenant profile isolated and add 2 queues - main and isolated
edgeTenantProfile.setIsolatedTbRuleEngine(true); edgeTenantProfile.setIsolatedTbRuleEngine(true);
@ -110,6 +110,10 @@ public class TenantProfileEdgeTest extends AbstractEdgeTest {
Assert.assertNotNull(queue); Assert.assertNotNull(queue);
Assert.assertEquals(tenantId, queue.getTenantId()); Assert.assertEquals(tenantId, queue.getTenantId());
} }
loginSysAdmin();
doPost("/api/tenantProfile", originalTenantProfile, TenantProfile.class);
loginTenantAdmin();
} }
private TenantProfileQueueConfiguration createQueueConfig(String queueName, String queueTopic) { private TenantProfileQueueConfiguration createQueueConfig(String queueName, String queueTopic) {

View File

@ -41,6 +41,8 @@ public interface CustomerService extends EntityDaoService {
Customer findOrCreatePublicCustomer(TenantId tenantId); Customer findOrCreatePublicCustomer(TenantId tenantId);
Customer findPublicCustomer(TenantId tenantId);
PageData<Customer> findCustomersByTenantId(TenantId tenantId, PageLink pageLink); PageData<Customer> findCustomersByTenantId(TenantId tenantId, PageLink pageLink);
void deleteCustomersByTenantId(TenantId tenantId); void deleteCustomersByTenantId(TenantId tenantId);

View File

@ -213,12 +213,11 @@ public class CustomerServiceImpl extends AbstractCachedEntityService<CustomerCac
@Override @Override
public Customer findOrCreatePublicCustomer(TenantId tenantId) { public Customer findOrCreatePublicCustomer(TenantId tenantId) {
log.trace("Executing findOrCreatePublicCustomer, tenantId [{}]", tenantId); log.trace("Executing findOrCreatePublicCustomer, tenantId [{}]", tenantId);
Validator.validateId(tenantId, id -> INCORRECT_TENANT_ID + id); var publicCustomer = findPublicCustomer(tenantId);
Optional<Customer> publicCustomerOpt = customerDao.findPublicCustomerByTenantId(tenantId.getId()); if (publicCustomer != null) {
if (publicCustomerOpt.isPresent()) { return publicCustomer;
return publicCustomerOpt.get();
} }
var publicCustomer = new Customer(); publicCustomer = new Customer();
publicCustomer.setTenantId(tenantId); publicCustomer.setTenantId(tenantId);
publicCustomer.setTitle(PUBLIC_CUSTOMER_TITLE); publicCustomer.setTitle(PUBLIC_CUSTOMER_TITLE);
try { try {
@ -230,7 +229,7 @@ public class CustomerServiceImpl extends AbstractCachedEntityService<CustomerCac
return saveCustomer(publicCustomer, false); return saveCustomer(publicCustomer, false);
} catch (DataValidationException e) { } catch (DataValidationException e) {
if (CUSTOMER_UNIQUE_TITLE_EX_MSG.equals(e.getMessage())) { if (CUSTOMER_UNIQUE_TITLE_EX_MSG.equals(e.getMessage())) {
publicCustomerOpt = customerDao.findPublicCustomerByTenantId(tenantId.getId()); Optional<Customer> publicCustomerOpt = customerDao.findPublicCustomerByTenantId(tenantId.getId());
if (publicCustomerOpt.isPresent()) { if (publicCustomerOpt.isPresent()) {
return publicCustomerOpt.get(); return publicCustomerOpt.get();
} }
@ -239,6 +238,14 @@ public class CustomerServiceImpl extends AbstractCachedEntityService<CustomerCac
} }
} }
@Override
public Customer findPublicCustomer(TenantId tenantId) {
log.trace("Executing findPublicCustomer, tenantId [{}]", tenantId);
Validator.validateId(tenantId, id -> INCORRECT_TENANT_ID + id);
Optional<Customer> publicCustomerOpt = customerDao.findPublicCustomerByTenantId(tenantId.getId());
return publicCustomerOpt.orElse(null);
}
@Override @Override
public PageData<Customer> findCustomersByTenantId(TenantId tenantId, PageLink pageLink) { public PageData<Customer> findCustomersByTenantId(TenantId tenantId, PageLink pageLink) {
log.trace("Executing findCustomersByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); log.trace("Executing findCustomersByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink);