From cfd471fe4af9e59b7042268b0aadcbdaea3f6176 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 18 Oct 2022 19:22:14 +0300 Subject: [PATCH] CE edge - single customer --- .../service/edge/rpc/EdgeGrpcSession.java | 2 +- .../service/edge/rpc/EdgeSyncCursor.java | 14 +++-- ...er.java => BaseUsersEdgeEventFetcher.java} | 6 +- .../rpc/fetch/CustomerEdgeEventFetcher.java | 22 ++++--- .../fetch/CustomerUsersEdgeEventFetcher.java | 38 +++++++++++ .../TenantAdminUsersEdgeEventFetcher.java | 34 ++++++++++ .../rpc/processor/CustomerEdgeProcessor.java | 40 +++++++++++- .../edge/rpc/processor/EdgeProcessor.java | 63 ++++++++++++++++++- .../DefaultTbNotificationEntityService.java | 8 ++- .../customer/DefaultTbCustomerService.java | 8 ++- .../server/edge/BaseAssetEdgeTest.java | 5 +- .../server/edge/BaseCustomerEdgeTest.java | 23 +++++-- .../server/edge/BaseDashboardEdgeTest.java | 5 +- .../server/edge/BaseDeviceEdgeTest.java | 5 +- .../thingsboard/server/edge/BaseEdgeTest.java | 37 ++++++++--- .../server/edge/BaseEntityViewEdgeTest.java | 6 +- .../server/edge/BaseUserEdgeTest.java | 7 +++ .../server/dao/edge/EdgeServiceImpl.java | 3 +- 18 files changed, 283 insertions(+), 43 deletions(-) rename application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/{UsersEdgeEventFetcher.java => BaseUsersEdgeEventFetcher.java} (88%) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index df29434f29..562d34012c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -180,7 +180,7 @@ public final class EdgeGrpcSession implements Closeable { public void startSyncProcess(TenantId tenantId, EdgeId edgeId) { log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId); syncCompleted = false; - doSync(new EdgeSyncCursor(ctx)); + doSync(new EdgeSyncCursor(ctx, edge)); } private void doSync(EdgeSyncCursor cursor) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index f1b0606498..b52370adec 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -15,11 +15,14 @@ */ package org.thingsboard.server.service.edge.rpc; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.rpc.fetch.AdminSettingsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.AssetProfilesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.CustomerEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.DashboardsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.DeviceProfilesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.DevicesEdgeEventFetcher; @@ -28,8 +31,8 @@ import org.thingsboard.server.service.edge.rpc.fetch.OtaPackagesEdgeEventFetcher import org.thingsboard.server.service.edge.rpc.fetch.QueuesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.RuleChainsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetsBundlesEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.TenantAdminUsersEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEventFetcher; -import org.thingsboard.server.service.edge.rpc.fetch.UsersEdgeEventFetcher; import java.util.LinkedList; import java.util.List; @@ -41,14 +44,17 @@ public class EdgeSyncCursor { int currentIdx = 0; - public EdgeSyncCursor(EdgeContextComponent ctx) { + public EdgeSyncCursor(EdgeContextComponent ctx, Edge edge) { fetchers.add(new QueuesEdgeEventFetcher(ctx.getQueueService())); fetchers.add(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService())); fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService(), ctx.getFreemarkerConfig())); fetchers.add(new DeviceProfilesEdgeEventFetcher(ctx.getDeviceProfileService())); fetchers.add(new AssetProfilesEdgeEventFetcher(ctx.getAssetProfileService())); - fetchers.add(new CustomerEdgeEventFetcher(ctx.getCustomerService())); - fetchers.add(new UsersEdgeEventFetcher(ctx.getUserService())); + fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService())); + if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { + fetchers.add(new CustomerEdgeEventFetcher()); + fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId())); + } fetchers.add(new DevicesEdgeEventFetcher(ctx.getDeviceService())); fetchers.add(new AssetsEdgeEventFetcher(ctx.getAssetService())); fetchers.add(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/UsersEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java similarity index 88% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/UsersEdgeEventFetcher.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java index e2883bd951..6791ba69a6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/UsersEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java @@ -30,13 +30,13 @@ import org.thingsboard.server.dao.user.UserService; @Slf4j @AllArgsConstructor -public class UsersEdgeEventFetcher extends BasePageableEdgeEventFetcher { +public abstract class BaseUsersEdgeEventFetcher extends BasePageableEdgeEventFetcher { protected final UserService userService; @Override PageData fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { - return userService.findUsersByTenantId(tenantId, pageLink); + return findUsers(tenantId, pageLink); } @Override @@ -44,4 +44,6 @@ public class UsersEdgeEventFetcher extends BasePageableEdgeEventFetcher { return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null); } + + protected abstract PageData findUsers(TenantId tenantId, PageLink pageLink); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java index 8674a4e23b..de88c2f05b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerEdgeEventFetcher.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.edge.rpc.fetch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -26,22 +25,25 @@ import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.dao.customer.CustomerService; + +import java.util.ArrayList; +import java.util.List; @Slf4j @AllArgsConstructor -public class CustomerEdgeEventFetcher extends BasePageableEdgeEventFetcher { - - private final CustomerService customerService; +public class CustomerEdgeEventFetcher implements EdgeEventFetcher { @Override - PageData fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { - return customerService.findCustomersByTenantId(tenantId, pageLink); + public PageLink getPageLink(int pageSize) { + return null; } @Override - EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, Customer customer) { - return EdgeUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, - EdgeEventActionType.ADDED, customer.getId(), null); + public PageData fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { + List result = new ArrayList<>(); + result.add(EdgeUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(), + EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null)); + // @voba - returns PageData object to be in sync with other fetchers + return new PageData<>(result, 1, result.size(), false); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java new file mode 100644 index 0000000000..2cc923d08a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.user.UserService; + +public class CustomerUsersEdgeEventFetcher extends BaseUsersEdgeEventFetcher { + + private final CustomerId customerId; + + public CustomerUsersEdgeEventFetcher(UserService userService, CustomerId customerId) { + super(userService); + this.customerId = customerId; + } + + @Override + protected PageData findUsers(TenantId tenantId, PageLink pageLink) { + return userService.findCustomerUsers(tenantId, customerId, pageLink); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java new file mode 100644 index 0000000000..a496231f7a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.user.UserService; + +public class TenantAdminUsersEdgeEventFetcher extends BaseUsersEdgeEventFetcher { + + public TenantAdminUsersEdgeEventFetcher(UserService userService) { + super(userService); + } + + @Override + protected PageData findUsers(TenantId tenantId, PageLink pageLink) { + return userService.findTenantAdmins(tenantId, pageLink); + } +} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java index ec32203d19..7d720fa671 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java @@ -15,20 +15,32 @@ */ package org.thingsboard.server.service.edge.rpc.processor; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + @Component @Slf4j @TbCoreComponent @@ -64,6 +76,32 @@ public class CustomerEdgeProcessor extends BaseEdgeProcessor { } public ListenableFuture processCustomerNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { - return processEntityNotificationForAllEdges(tenantId, edgeNotificationMsg); + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); + UUID uuid = new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()); + CustomerId customerId = new CustomerId(EntityIdFactory.getByEdgeEventTypeAndUuid(type, uuid).getId()); + switch (actionType) { + case UPDATED: + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + List> futures = new ArrayList<>(); + do { + pageData = edgeService.findEdgesByTenantIdAndCustomerId(tenantId, customerId, pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + for (Edge edge : pageData.getData()) { + futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, customerId, null)); + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + case DELETED: + EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); + return saveEdgeEvent(tenantId, edgeId, type, actionType, customerId, null); + default: + return Futures.immediateFuture(null); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java index 0aaedfe2f9..c1fff8258f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java @@ -15,19 +15,31 @@ */ package org.thingsboard.server.service.edge.rpc.processor; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + @Component @Slf4j @TbCoreComponent @@ -54,6 +66,55 @@ public class EdgeProcessor extends BaseEdgeProcessor { } public ListenableFuture processEdgeNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { - return processEntityNotification(tenantId, edgeNotificationMsg); + try { + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + ListenableFuture edgeFuture; + switch (actionType) { + case ASSIGNED_TO_CUSTOMER: + CustomerId customerId = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); + edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); + return Futures.transformAsync(edgeFuture, edge -> { + if (edge == null || customerId.isNullUid()) { + return Futures.immediateFuture(null); + } + List> futures = new ArrayList<>(); + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, customerId, null)); + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + do { + pageData = userService.findCustomerUsers(tenantId, customerId, pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + log.trace("[{}] [{}] user(s) are going to be added to edge.", edge.getId(), pageData.getData().size()); + for (User user : pageData.getData()) { + futures.add(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null)); + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + return Futures.transformAsync(Futures.allAsList(futures), voids -> + saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.ASSIGNED_TO_CUSTOMER, edgeId, null), + dbCallbackExecutorService); + }, dbCallbackExecutorService); + case UNASSIGNED_FROM_CUSTOMER: + CustomerId customerIdToDelete = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), CustomerId.class); + edgeFuture = edgeService.findEdgeByIdAsync(tenantId, edgeId); + return Futures.transformAsync(edgeFuture, edge -> { + if (edge == null || customerIdToDelete.isNullUid()) { + return Futures.immediateFuture(null); + } + return Futures.transformAsync(saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.DELETED, customerIdToDelete, null), + voids -> saveEdgeEvent(edge.getTenantId(), edge.getId(), EdgeEventType.EDGE, EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER, edgeId, null), + dbCallbackExecutorService); + }, dbCallbackExecutorService); + default: + return Futures.immediateFuture(null); + } + } catch (Exception e) { + log.error("Exception during processing edge event", e); + return Futures.immediateFailedFuture(e); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index bb11fdc33b..8f8531a5b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -135,7 +135,7 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS logEntityAction(tenantId, entityId, entity, customerId, actionType, user, additionalInfo); if (sendToEdge) { - sendEntityNotificationMsg(tenantId, entityId, edgeTypeByActionType(actionType)); + sendEntityNotificationMsg(tenantId, entityId, edgeTypeByActionType(actionType), JacksonUtil.toString(customerId)); } } @@ -252,7 +252,11 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS } private void sendEntityNotificationMsg(TenantId tenantId, EntityId entityId, EdgeEventActionType action) { - sendNotificationMsgToEdge(tenantId, null, entityId, null, null, action); + sendEntityNotificationMsg(tenantId, entityId, action, null); + } + + private void sendEntityNotificationMsg(TenantId tenantId, EntityId entityId, EdgeEventActionType action, String body) { + sendNotificationMsgToEdge(tenantId, null, entityId, body, null, action); } private void sendAlarmDeleteNotificationMsg(TenantId tenantId, Alarm alarm, List edgeIds, String body) { diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java index f46f7551cd..e2f6b11f92 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java @@ -22,10 +22,13 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.service.entitiy.AbstractTbEntityService; +import java.util.List; + @Service @AllArgsConstructor public class DefaultTbCustomerService extends AbstractTbEntityService implements TbCustomerService { @@ -51,9 +54,10 @@ public class DefaultTbCustomerService extends AbstractTbEntityService implements TenantId tenantId = customer.getTenantId(); CustomerId customerId = customer.getId(); try { + List relatedEdgeIds = edgeService.findAllRelatedEdgeIds(tenantId, customer.getId()); customerService.deleteCustomer(tenantId, customerId); - notificationEntityService.notifyCreateOrUpdateOrDelete(tenantId, customerId, customer.getId(), customer, - user, ActionType.DELETED, true, null, customerId.toString()); + notificationEntityService.notifyDeleteEntity(tenantId, customer.getId(), customer, customerId, + ActionType.DELETED, relatedEdgeIds, user, customerId.toString()); tbClusterService.broadcastEntityStateChangeEvent(tenantId, customer.getId(), ComponentLifecycleEvent.DELETED); } catch (Exception e) { notificationEntityService.logEntityAction(tenantId, emptyId(EntityType.CUSTOMER), ActionType.DELETED, diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseAssetEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseAssetEdgeTest.java index 5bfd39cbd2..57643eeed3 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseAssetEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseAssetEdgeTest.java @@ -20,6 +20,7 @@ import org.junit.Assert; import org.junit.Test; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -94,10 +95,12 @@ abstract public class BaseAssetEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedAsset.getType(), assetUpdateMsg.getType()); // assign asset #2 to customer - edgeImitator.expectMessageAmount(1); Customer customer = new Customer(); customer.setTitle("Edge Customer"); Customer savedCustomer = doPost("/api/customer", customer, Customer.class); + edgeImitator.expectMessageAmount(2); + doPost("/api/customer/" + savedCustomer.getUuidId() + + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); edgeImitator.expectMessageAmount(1); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseCustomerEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseCustomerEdgeTest.java index 33065894c1..dd2880d2ba 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseCustomerEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseCustomerEdgeTest.java @@ -19,9 +19,13 @@ import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg; +import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import java.util.Optional; + import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; abstract public class BaseCustomerEdgeTest extends AbstractEdgeTest { @@ -33,10 +37,21 @@ abstract public class BaseCustomerEdgeTest extends AbstractEdgeTest { Customer customer = new Customer(); customer.setTitle("Edge Customer"); Customer savedCustomer = doPost("/api/customer", customer, Customer.class); + Assert.assertFalse(edgeImitator.waitForMessages(1)); + + // assign edge to customer + edgeImitator.expectMessageAmount(2); + doPost("/api/customer/" + savedCustomer.getUuidId() + + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof CustomerUpdateMsg); - CustomerUpdateMsg customerUpdateMsg = (CustomerUpdateMsg) latestMessage; + Optional edgeConfigurationOpt = edgeImitator.findMessageByType(EdgeConfiguration.class); + Assert.assertTrue(edgeConfigurationOpt.isPresent()); + EdgeConfiguration edgeConfiguration = edgeConfigurationOpt.get(); + Assert.assertEquals(savedCustomer.getUuidId().getMostSignificantBits(), edgeConfiguration.getCustomerIdMSB()); + Assert.assertEquals(savedCustomer.getUuidId().getLeastSignificantBits(), edgeConfiguration.getCustomerIdLSB()); + Optional customerUpdateOpt = edgeImitator.findMessageByType(CustomerUpdateMsg.class); + Assert.assertTrue(customerUpdateOpt.isPresent()); + CustomerUpdateMsg customerUpdateMsg = customerUpdateOpt.get(); Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, customerUpdateMsg.getMsgType()); Assert.assertEquals(savedCustomer.getUuidId().getMostSignificantBits(), customerUpdateMsg.getIdMSB()); Assert.assertEquals(savedCustomer.getUuidId().getLeastSignificantBits(), customerUpdateMsg.getIdLSB()); @@ -48,7 +63,7 @@ abstract public class BaseCustomerEdgeTest extends AbstractEdgeTest { savedCustomer.setTitle("Edge Customer Updated"); savedCustomer = doPost("/api/customer", savedCustomer, Customer.class); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); Assert.assertTrue(latestMessage instanceof CustomerUpdateMsg); customerUpdateMsg = (CustomerUpdateMsg) latestMessage; Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, customerUpdateMsg.getMsgType()); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDashboardEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDashboardEdgeTest.java index 9553d22fdd..1e85200419 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDashboardEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDashboardEdgeTest.java @@ -23,6 +23,7 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.ShortCustomerInfo; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; @@ -97,10 +98,12 @@ abstract public class BaseDashboardEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedDashboard.getTitle(), dashboardUpdateMsg.getTitle()); // assign dashboard #2 to customer - edgeImitator.expectMessageAmount(1); Customer customer = new Customer(); customer.setTitle("Edge Customer"); Customer savedCustomer = doPost("/api/customer", customer, Customer.class); + edgeImitator.expectMessageAmount(2); + doPost("/api/customer/" + savedCustomer.getUuidId() + + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); edgeImitator.expectMessageAmount(1); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 701354851c..4516e75e3d 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -119,10 +120,12 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedDevice.getDeviceProfileId().getId().getLeastSignificantBits(), deviceProfileUpdateMsg.getIdLSB()); // assign device #2 to customer - edgeImitator.expectMessageAmount(1); Customer customer = new Customer(); customer.setTitle("Edge Customer"); Customer savedCustomer = doPost("/api/customer", customer, Customer.class); + edgeImitator.expectMessageAmount(2); + doPost("/api/customer/" + savedCustomer.getUuidId() + + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); edgeImitator.expectMessageAmount(1); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index c040edbe5b..2018e5c2ca 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -22,8 +22,11 @@ import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import java.util.Optional; import java.util.UUID; abstract public class BaseEdgeTest extends AbstractEdgeTest { @@ -35,28 +38,42 @@ abstract public class BaseEdgeTest extends AbstractEdgeTest { Customer customer = new Customer(); customer.setTitle("Edge Customer"); Customer savedCustomer = doPost("/api/customer", customer, Customer.class); - Assert.assertTrue(edgeImitator.waitForMessages()); + Assert.assertFalse(edgeImitator.waitForMessages(1)); - edgeImitator.expectMessageAmount(1); + // assign edge to customer + edgeImitator.expectMessageAmount(2); doPost("/api/customer/" + savedCustomer.getUuidId() + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof EdgeConfiguration); - EdgeConfiguration edgeConfiguration = (EdgeConfiguration) latestMessage; + Optional edgeConfigurationOpt = edgeImitator.findMessageByType(EdgeConfiguration.class); + Assert.assertTrue(edgeConfigurationOpt.isPresent()); + EdgeConfiguration edgeConfiguration = edgeConfigurationOpt.get(); Assert.assertEquals(savedCustomer.getUuidId().getMostSignificantBits(), edgeConfiguration.getCustomerIdMSB()); Assert.assertEquals(savedCustomer.getUuidId().getLeastSignificantBits(), edgeConfiguration.getCustomerIdLSB()); + Optional customerUpdateOpt = edgeImitator.findMessageByType(CustomerUpdateMsg.class); + Assert.assertTrue(customerUpdateOpt.isPresent()); + CustomerUpdateMsg customerUpdateMsg = customerUpdateOpt.get(); + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, customerUpdateMsg.getMsgType()); + Assert.assertEquals(savedCustomer.getUuidId().getMostSignificantBits(), customerUpdateMsg.getIdMSB()); + Assert.assertEquals(savedCustomer.getUuidId().getLeastSignificantBits(), customerUpdateMsg.getIdLSB()); + Assert.assertEquals(savedCustomer.getTitle(), customerUpdateMsg.getTitle()); + testAutoGeneratedCodeByProtobuf(customerUpdateMsg); // unassign edge from customer - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); doDelete("/api/customer/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof EdgeConfiguration); - edgeConfiguration = (EdgeConfiguration) latestMessage; + edgeConfigurationOpt = edgeImitator.findMessageByType(EdgeConfiguration.class); + Assert.assertTrue(edgeConfigurationOpt.isPresent()); + edgeConfiguration = edgeConfigurationOpt.get(); Assert.assertEquals( new CustomerId(EntityId.NULL_UUID), new CustomerId(new UUID(edgeConfiguration.getCustomerIdMSB(), edgeConfiguration.getCustomerIdLSB()))); - + customerUpdateOpt = edgeImitator.findMessageByType(CustomerUpdateMsg.class); + Assert.assertTrue(customerUpdateOpt.isPresent()); + customerUpdateMsg = customerUpdateOpt.get(); + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, customerUpdateMsg.getMsgType()); + Assert.assertEquals(savedCustomer.getUuidId().getMostSignificantBits(), customerUpdateMsg.getIdMSB()); + Assert.assertEquals(savedCustomer.getUuidId().getLeastSignificantBits(), customerUpdateMsg.getIdLSB()); } } diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEntityViewEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEntityViewEdgeTest.java index 8867c60c59..1c0e7887de 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEntityViewEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEntityViewEdgeTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.gen.edge.v1.EntityViewUpdateMsg; @@ -61,7 +62,6 @@ abstract public class BaseEntityViewEdgeTest extends AbstractEdgeTest { Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, entityViewUpdateMsg.getMsgType()); Assert.assertEquals(savedEntityView.getName(), entityViewUpdateMsg.getName()); - // request entity view(s) for device UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); EntityViewsRequestMsg.Builder entityViewsRequestBuilder = EntityViewsRequestMsg.newBuilder(); @@ -111,10 +111,12 @@ abstract public class BaseEntityViewEdgeTest extends AbstractEdgeTest { verifyEntityViewUpdateMsg(savedEntityView, device); // assign entity view #2 to customer - edgeImitator.expectMessageAmount(1); Customer customer = new Customer(); customer.setTitle("Edge Customer"); Customer savedCustomer = doPost("/api/customer", customer, Customer.class); + edgeImitator.expectMessageAmount(2); + doPost("/api/customer/" + savedCustomer.getUuidId() + + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); edgeImitator.expectMessageAmount(1); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseUserEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseUserEdgeTest.java index a2fbc9b968..c0deffa74b 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseUserEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseUserEdgeTest.java @@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; @@ -112,6 +113,12 @@ abstract public class BaseUserEdgeTest extends AbstractEdgeTest { Customer customer = new Customer(); customer.setTitle("Edge Customer"); Customer savedCustomer = doPost("/api/customer", customer, Customer.class); + Assert.assertFalse(edgeImitator.waitForMessages(1)); + + // assign edge to customer + edgeImitator.expectMessageAmount(2); + doPost("/api/customer/" + savedCustomer.getUuidId() + + "/edge/" + edge.getUuidId(), Edge.class); Assert.assertTrue(edgeImitator.waitForMessages()); // create user diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 32d719977b..3ab3f6d905 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -428,8 +428,9 @@ public class EdgeServiceImpl extends AbstractCachedEntityService edgeIds = Collections.singletonList(new EdgeId(entityId.getId())); return new PageData<>(edgeIds, 1, 1, false);